From d5af7606f984e1779f446d17020d6133bd4d8d51 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 19:11:04 -0400 Subject: [PATCH 01/11] Fold AggregateMetric into AggregateEntry The aggregator allocated one AggregateMetric per entry and reached its counters/histograms through entry.aggregate.X. Folding the state and methods onto AggregateEntry removes: - one object header + alignment (~16-24 bytes) per entry, - one reference field on AggregateEntry, - one virtual dispatch hop on the consumer hot path (entry.recordOneDuration replaces entry.aggregate.recordOneDuration). Moves ERROR_TAG / TOP_LEVEL_TAG onto AggregateEntry. recordOneDuration, recordDurations, clearAggregate, and the counter/histogram accessors all live directly on the entry now. equals/hashCode still depend only on the immutable label fields (the counters are not part of identity). Also migrates AggregateMetricTest.groovy to AggregateEntryRecordingTest.java under JUnit 5, per project conventions. Benchmark unchanged (within noise) since the fold is purely a consumer-side + memory change: before: 1.995 us/op after: 2.040 us/op (stdev 0.011; prior stdev 0.023) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 127 ++++++++++++++++-- .../trace/common/metrics/AggregateMetric.java | 103 -------------- .../trace/common/metrics/AggregateTable.java | 28 ++-- .../trace/common/metrics/Aggregator.java | 8 +- .../common/metrics/ClientStatsAggregator.java | 4 +- .../trace/common/metrics/MetricWriter.java | 2 +- .../metrics/SerializingMetricWriter.java | 13 +- .../trace/common/metrics/SpanSnapshot.java | 4 +- .../common/metrics/AggregateMetricTest.groovy | 105 --------------- .../metrics/ClientStatsAggregatorTest.groovy | 62 ++++----- .../SerializingMetricWriterTest.groovy | 11 +- .../metrics/AggregateEntryRecordingTest.java | 105 +++++++++++++++ .../common/metrics/AggregateTableTest.java | 61 +++++---- .../groovy/MetricsIntegrationTest.groovy | 4 +- 14 files changed, 316 insertions(+), 321 deletions(-) delete mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java delete mode 100644 dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryRecordingTest.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 225f03197e5..ce583282b9e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -1,16 +1,20 @@ package datadog.trace.common.metrics; +import datadog.metrics.api.Histogram; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongArray; /** * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data - * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}. + * {@link SerializingMetricWriter} writes to the wire) plus the mutable per-bucket counters and + * latency histograms. * *

UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval. The @@ -30,8 +34,18 @@ * the aggregator thread may call {@link Canonical#populate} or {@link #resetCardinalityHandlers}. * Test code uses {@link #of} which constructs entries without touching the handlers. */ +@SuppressFBWarnings( + value = {"AT_NONATOMIC_OPERATIONS_ON_SHARED_VARIABLE", "AT_STALE_THREAD_WRITE_OF_PRIMITIVE"}, + justification = + "Recording counters are mutated only on the aggregator thread; not thread-safe by design.") final class AggregateEntry extends Hashtable.Entry { + /** Top bit of the duration word: set when the recorded span was an error. */ + static final long ERROR_TAG = 0x8000000000000000L; + + /** Second-from-top bit: set when the recorded span was a top-level span. */ + static final long TOP_LEVEL_TAG = 0x4000000000000000L; + // Per-field cardinality limits. Identical to the prior DDCache sizes. static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32); static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32); @@ -59,7 +73,14 @@ final class AggregateEntry extends Hashtable.Entry { final boolean synthetic; final boolean traceRoot; final List peerTags; - final AggregateMetric aggregate; + + // Recording state. Mutated only on the aggregator thread. Not thread-safe. + private final Histogram okLatencies; + private final Histogram errorLatencies; + private int errorCount; + private int hitCount; + private int topLevelCount; + private long duration; /** Field-bearing constructor used by both the hot path and the test factory. */ private AggregateEntry( @@ -76,8 +97,7 @@ private AggregateEntry( short httpStatusCode, boolean synthetic, boolean traceRoot, - List peerTags, - AggregateMetric aggregate) { + List peerTags) { super(keyHash); this.resource = resource; this.service = service; @@ -92,7 +112,8 @@ private AggregateEntry( this.synthetic = synthetic; this.traceRoot = traceRoot; this.peerTags = peerTags; - this.aggregate = aggregate; + this.okLatencies = Histogram.newHistogram(); + this.errorLatencies = Histogram.newHistogram(); } /** @@ -154,8 +175,7 @@ static AggregateEntry of( (short) httpStatusCode, synthetic, traceRoot, - peerTagsList, - new AggregateMetric()); + peerTagsList); } /** @@ -271,10 +291,92 @@ List getPeerTags() { return peerTags; } + // ----- recording state accessors ----- + + int getHitCount() { + return hitCount; + } + + int getErrorCount() { + return errorCount; + } + + int getTopLevelCount() { + return topLevelCount; + } + + long getDuration() { + return duration; + } + + Histogram getOkLatencies() { + return okLatencies; + } + + Histogram getErrorLatencies() { + return errorLatencies; + } + + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + AggregateEntry recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatencies.accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + return this; + } + + /** + * Records {@code count} durations from {@code durations} (positions 0..count-1). Used by + * integration tests; production code uses {@link #recordOneDuration}. + */ + AggregateEntry recordDurations(int count, AtomicLongArray durations) { + this.hitCount += count; + for (int i = 0; i < count && i < durations.length(); ++i) { + long d = durations.getAndSet(i, 0); + if ((d & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + d ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((d & ERROR_TAG) == ERROR_TAG) { + d ^= ERROR_TAG; + errorLatencies.accept(d); + ++errorCount; + } else { + okLatencies.accept(d); + } + this.duration += d; + } + return this; + } + + /** Clears the recording state. Histograms are reused. */ + @SuppressFBWarnings("AT_NONATOMIC_64BIT_PRIMITIVE") + void clearAggregate() { + this.errorCount = 0; + this.hitCount = 0; + this.topLevelCount = 0; + this.duration = 0; + this.okLatencies.clear(); + this.errorLatencies.clear(); + } + /** - * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the - * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link Canonical#matches} and - * never calls {@code equals}. + * Equality on the 13 label fields (not on the recording counters). Used only by test mock + * matchers; the {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link + * Canonical#matches} and never calls {@code equals}. */ @Override public boolean equals(Object o) { @@ -426,7 +528,7 @@ private static boolean peerTagsEqual(List a, List snapshottedPeerTags; int n = peerTagsBuffer.size(); if (n == 0) { @@ -450,8 +552,7 @@ AggregateEntry toEntry(AggregateMetric aggregate) { httpStatusCode, synthetic, traceRoot, - snapshottedPeerTags, - aggregate); + snapshottedPeerTags); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java deleted file mode 100644 index dba66a5ab9c..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ /dev/null @@ -1,103 +0,0 @@ -package datadog.trace.common.metrics; - -import datadog.metrics.api.Histogram; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.concurrent.atomic.AtomicLongArray; - -/** Not thread-safe. Accumulates counts and durations. */ -@SuppressFBWarnings( - value = {"AT_NONATOMIC_OPERATIONS_ON_SHARED_VARIABLE", "AT_STALE_THREAD_WRITE_OF_PRIMITIVE"}, - justification = "Explicitly not thread-safe. Accumulates counts and durations.") -public final class AggregateMetric { - - static final long ERROR_TAG = 0x8000000000000000L; - static final long TOP_LEVEL_TAG = 0x4000000000000000L; - - private final Histogram okLatencies; - private final Histogram errorLatencies; - private int errorCount; - private int hitCount; - private int topLevelCount; - private long duration; - - public AggregateMetric() { - okLatencies = Histogram.newHistogram(); - errorLatencies = Histogram.newHistogram(); - } - - public AggregateMetric recordDurations(int count, AtomicLongArray durations) { - this.hitCount += count; - for (int i = 0; i < count && i < durations.length(); ++i) { - long duration = durations.getAndSet(i, 0); - if ((duration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { - duration ^= TOP_LEVEL_TAG; - ++topLevelCount; - } - if ((duration & ERROR_TAG) == ERROR_TAG) { - // then it's an error - duration ^= ERROR_TAG; - errorLatencies.accept(duration); - ++errorCount; - } else { - okLatencies.accept(duration); - } - this.duration += duration; - } - return this; - } - - /** - * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link - * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. - */ - public AggregateMetric recordOneDuration(long tagAndDuration) { - ++hitCount; - if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { - tagAndDuration ^= TOP_LEVEL_TAG; - ++topLevelCount; - } - if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { - tagAndDuration ^= ERROR_TAG; - errorLatencies.accept(tagAndDuration); - ++errorCount; - } else { - okLatencies.accept(tagAndDuration); - } - duration += tagAndDuration; - return this; - } - - public int getErrorCount() { - return errorCount; - } - - public int getHitCount() { - return hitCount; - } - - public int getTopLevelCount() { - return topLevelCount; - } - - public long getDuration() { - return duration; - } - - public Histogram getOkLatencies() { - return okLatencies; - } - - public Histogram getErrorLatencies() { - return errorLatencies; - } - - @SuppressFBWarnings("AT_NONATOMIC_64BIT_PRIMITIVE") - public void clear() { - this.errorCount = 0; - this.hitCount = 0; - this.topLevelCount = 0; - this.duration = 0; - this.okLatencies.clear(); - this.errorLatencies.clear(); - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java index 38d45ef5e85..7a3e9234387 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -4,7 +4,7 @@ import java.util.function.Consumer; /** - * Consumer-side {@link AggregateMetric} store, keyed on the canonical UTF8-encoded labels of a + * Consumer-side {@link AggregateEntry} store, keyed on the canonical UTF8-encoded labels of a * {@link SpanSnapshot}. * *

{@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so @@ -37,11 +37,11 @@ boolean isEmpty() { } /** - * Returns the {@link AggregateMetric} to update for {@code snapshot}, lazily creating an entry on - * miss. Returns {@code null} when the table is at capacity and no stale entry can be evicted -- - * the caller should drop the data point in that case. + * Returns the {@link AggregateEntry} to update for {@code snapshot}, lazily creating it on miss. + * Returns {@code null} when the table is at capacity and no stale entry can be evicted -- the + * caller should drop the data point in that case. */ - AggregateMetric findOrInsert(SpanSnapshot snapshot) { + AggregateEntry findOrInsert(SpanSnapshot snapshot) { canonical.populate(snapshot); long keyHash = canonical.keyHash; int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); @@ -49,28 +49,28 @@ AggregateMetric findOrInsert(SpanSnapshot snapshot) { if (e.keyHash == keyHash) { AggregateEntry candidate = (AggregateEntry) e; if (canonical.matches(candidate)) { - return candidate.aggregate; + return candidate; } } } if (size >= maxAggregates && !evictOneStale()) { return null; } - AggregateEntry entry = canonical.toEntry(new AggregateMetric()); + AggregateEntry entry = canonical.toEntry(); entry.setNext(buckets[bucketIndex]); buckets[bucketIndex] = entry; size++; - return entry.aggregate; + return entry; } - /** Unlink the first entry whose {@code AggregateMetric.getHitCount() == 0}. */ + /** Unlink the first entry whose {@code getHitCount() == 0}. */ private boolean evictOneStale() { for (int i = 0; i < buckets.length; i++) { Hashtable.Entry head = buckets[i]; if (head == null) { continue; } - if (((AggregateEntry) head).aggregate.getHitCount() == 0) { + if (((AggregateEntry) head).getHitCount() == 0) { buckets[i] = head.next(); size--; return true; @@ -78,7 +78,7 @@ private boolean evictOneStale() { Hashtable.Entry prev = head; Hashtable.Entry cur = head.next(); while (cur != null) { - if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + if (((AggregateEntry) cur).getHitCount() == 0) { prev.setNext(cur.next()); size--; return true; @@ -98,12 +98,12 @@ void forEach(Consumer consumer) { } } - /** Removes entries whose {@code AggregateMetric.getHitCount() == 0}. */ + /** Removes entries whose {@code getHitCount() == 0}. */ void expungeStaleAggregates() { for (int i = 0; i < buckets.length; i++) { // unlink leading stale entries Hashtable.Entry head = buckets[i]; - while (head != null && ((AggregateEntry) head).aggregate.getHitCount() == 0) { + while (head != null && ((AggregateEntry) head).getHitCount() == 0) { head = head.next(); size--; } @@ -115,7 +115,7 @@ void expungeStaleAggregates() { Hashtable.Entry prev = head; Hashtable.Entry cur = head.next(); while (cur != null) { - if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + if (((AggregateEntry) cur).getHitCount() == 0) { Hashtable.Entry skipped = cur.next(); prev.setNext(skipped); size--; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 8fe25288acd..8cbd41e7897 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -111,9 +111,9 @@ public void accept(InboxItem item) { } } else if (item instanceof SpanSnapshot && !stopped) { SpanSnapshot snapshot = (SpanSnapshot) item; - AggregateMetric aggregate = aggregates.findOrInsert(snapshot); - if (aggregate != null) { - aggregate.recordOneDuration(snapshot.tagAndDuration); + AggregateEntry entry = aggregates.findOrInsert(snapshot); + if (entry != null) { + entry.recordOneDuration(snapshot.tagAndDuration); dirty = true; } else { // table at cap with no stale entry available to evict @@ -134,7 +134,7 @@ private void report(long when, SignalItem signal) { aggregates.forEach( entry -> { writer.add(entry); - entry.aggregate.clear(); + entry.clearAggregate(); }); // note that this may do IO and block writer.finishBucket(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index d08ce611100..d6cd2f2671e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -4,8 +4,8 @@ import static datadog.trace.api.DDSpanTypes.RPC; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java index c31825f6af8..80e3467796a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java @@ -5,7 +5,7 @@ public interface MetricWriter { /** * Serialize one aggregate. The {@link AggregateEntry} carries both the label fields (resource, - * service, span.kind, peer tags, etc.) and the {@link AggregateMetric} counters being reported. + * service, span.kind, peer tags, etc.) and the recording counters/histograms. */ void add(AggregateEntry entry); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index ba6ae6c2699..7644ebaf044 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -143,7 +143,6 @@ public void startBucket(int metricCount, long start, long duration) { @Override public void add(AggregateEntry entry) { - final AggregateMetric aggregate = entry.aggregate; // Calculate dynamic map size based on optional fields final boolean hasHttpMethod = entry.getHttpMethod() != null; final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null; @@ -213,22 +212,22 @@ public void add(AggregateEntry entry) { } writer.writeUTF8(HITS); - writer.writeInt(aggregate.getHitCount()); + writer.writeInt(entry.getHitCount()); writer.writeUTF8(ERRORS); - writer.writeInt(aggregate.getErrorCount()); + writer.writeInt(entry.getErrorCount()); writer.writeUTF8(TOP_LEVEL_HITS); - writer.writeInt(aggregate.getTopLevelCount()); + writer.writeInt(entry.getTopLevelCount()); writer.writeUTF8(DURATION); - writer.writeLong(aggregate.getDuration()); + writer.writeLong(entry.getDuration()); writer.writeUTF8(OK_SUMMARY); - writer.writeBinary(aggregate.getOkLatencies().serialize()); + writer.writeBinary(entry.getOkLatencies().serialize()); writer.writeUTF8(ERROR_SUMMARY); - writer.writeBinary(aggregate.getErrorLatencies().serialize()); + writer.writeBinary(entry.getErrorLatencies().serialize()); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index 5967c1302c7..b84fb26a457 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -2,8 +2,8 @@ /** * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw - * inputs the aggregator needs to build an {@link AggregateEntry} and update its {@link - * AggregateMetric}. + * inputs the aggregator needs to look up or build an {@link AggregateEntry} and record one + * duration on it. * *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the * aggregator thread; the producer just shuffles references. diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy deleted file mode 100644 index 140149d8324..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ /dev/null @@ -1,105 +0,0 @@ -package datadog.trace.common.metrics - -import datadog.metrics.agent.AgentMeter -import datadog.metrics.impl.DDSketchHistograms -import datadog.metrics.impl.MonitoringImpl -import datadog.metrics.api.statsd.StatsDClient -import datadog.trace.test.util.DDSpecification - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLongArray - -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG - -class AggregateMetricTest extends DDSpecification { - - def setupSpec() { - // Initialize AgentMeter with monitoring - this is the standard mechanism used in production - def monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS) - AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY) - // Create a timer to trigger DDSketchHistograms loading and Factory registration - // This simulates what happens during CoreTracer initialization (traceWriteTimer) - monitoring.newTimer("test.init") - } - - def "record durations sums up to total"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3)) - then: - aggregate.getDuration() == 6 - } - - def "total durations include errors"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3)) - then: - aggregate.getDuration() == 6 - } - - def "clear"() { - given: - AggregateMetric aggregate = new AggregateMetric() - .recordDurations(3, new AtomicLongArray(5, ERROR_TAG | 6, TOP_LEVEL_TAG | 7)) - when: - aggregate.clear() - then: - aggregate.getDuration() == 0 - aggregate.getErrorCount() == 0 - aggregate.getTopLevelCount() == 0 - aggregate.getHitCount() == 0 - } - - def "recordOneDuration accumulates ok and error and top-level"() { - given: - AggregateMetric aggregate = new AggregateMetric() - .recordOneDuration(10L) - .recordOneDuration(10L | TOP_LEVEL_TAG) - .recordOneDuration(10L | ERROR_TAG) - - expect: - aggregate.getHitCount() == 3 - aggregate.getDuration() == 30 - aggregate.getErrorCount() == 1 - aggregate.getTopLevelCount() == 1 - } - - def "ignore trailing zeros"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3, 0, 0, 0)) - then: - aggregate.getDuration() == 6 - aggregate.getHitCount() == 3 - aggregate.getErrorCount() == 0 - } - - def "hit count includes errors"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(3, new AtomicLongArray(1, 2, 3 | ERROR_TAG)) - then: - aggregate.getHitCount() == 3 - aggregate.getErrorCount() == 1 - } - - def "ok and error durations tracked separately"() { - given: - AggregateMetric aggregate = new AggregateMetric() - when: - aggregate.recordDurations(10, - new AtomicLongArray(1, 100 | ERROR_TAG, 2, 99 | ERROR_TAG, 3, - 98 | ERROR_TAG, 4, 97 | ERROR_TAG)) - then: - def errorLatencies = aggregate.getErrorLatencies() - def okLatencies = aggregate.getOkLatencies() - errorLatencies.getMaxValue() >= 99 - okLatencies.getMaxValue() <= 5 - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy index 3cccc50c5a4..d8620e370f0 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy @@ -134,7 +134,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -180,7 +180,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -232,7 +232,7 @@ class ClientStatsAggregatorTest extends DDSpecification { httpEndpoint, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 } (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } @@ -297,7 +297,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 } 1 * writer.add( AggregateEntry.of( @@ -315,7 +315,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -362,7 +362,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 0 && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -414,7 +414,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == topLevelCount && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == topLevelCount && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -473,7 +473,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + e.getHitCount() == count && e.getDuration() == count * duration } 1 * writer.add(AggregateEntry.of( "resource2", @@ -490,7 +490,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration * 2 + e.getHitCount() == count && e.getDuration() == count * duration * 2 } cleanup: @@ -544,7 +544,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + e.getHitCount() == count && e.getDuration() == count * duration } 1 * writer.finishBucket() >> { latch.countDown() } @@ -585,7 +585,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } 1 * writer.add(AggregateEntry.of( "resource", @@ -602,7 +602,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/orders/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + e.getHitCount() == 1 && e.getDuration() == duration * 2 } 1 * writer.add(AggregateEntry.of( "resource", @@ -619,7 +619,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + e.getHitCount() == 1 && e.getDuration() == duration * 3 } 1 * writer.finishBucket() >> { latch2.countDown() } @@ -683,7 +683,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } 1 * writer.add(AggregateEntry.of( "resource", @@ -700,7 +700,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + e.getHitCount() == 1 && e.getDuration() == duration * 2 } 1 * writer.add(AggregateEntry.of( "resource", @@ -717,7 +717,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + e.getHitCount() == 1 && e.getDuration() == duration * 3 } 1 * writer.add(AggregateEntry.of( "resource", @@ -734,7 +734,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/orders/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 4 + e.getHitCount() == 1 && e.getDuration() == duration * 4 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -787,7 +787,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } 1 * writer.add(AggregateEntry.of( "resource", @@ -804,7 +804,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + e.getHitCount() == 1 && e.getDuration() == duration * 2 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -855,7 +855,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 2 && e.aggregate.getDuration() == 2 * duration + e.getHitCount() == 2 && e.getDuration() == 2 * duration } 1 * writer.add(AggregateEntry.of( "resource", @@ -872,7 +872,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } 1 * writer.finishBucket() >> { latch.countDown() } @@ -926,7 +926,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } } 0 * writer.add(AggregateEntry.of( @@ -1073,7 +1073,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1108,7 +1108,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } } 0 * writer.add(AggregateEntry.of( @@ -1175,7 +1175,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1234,7 +1234,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + e.getHitCount() == 1 && e.getDuration() == duration } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1401,7 +1401,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1456,7 +1456,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 3 && e.aggregate.getTopLevelCount() == 3 && e.aggregate.getDuration() == 450 + e.getHitCount() == 3 && e.getTopLevelCount() == 3 && e.getDuration() == 450 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1511,7 +1511,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/users/:id", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 100 } 1 * writer.add( AggregateEntry.of( @@ -1529,7 +1529,7 @@ class ClientStatsAggregatorTest extends DDSpecification { "/api/orders", null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 200 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 200 } 1 * writer.add( AggregateEntry.of( @@ -1547,7 +1547,7 @@ class ClientStatsAggregatorTest extends DDSpecification { null, null )) >> { AggregateEntry e -> - e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 150 + e.getHitCount() == 1 && e.getTopLevelCount() == 1 && e.getDuration() == 150 } 1 * writer.finishBucket() >> { latch.countDown() } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 08f0f7cbb92..5e85c66557d 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -45,7 +45,7 @@ class SerializingMetricWriterTest extends DDSpecification { resource, service, operationName, serviceSource, type, httpStatusCode, synthetic, traceRoot, spanKind, peerTags, httpMethod, httpEndpoint, grpcStatusCode) - e.aggregate.recordDurations(hitCount, new AtomicLongArray(1L)) + e.recordDurations(hitCount, new AtomicLongArray(1L)) return e } @@ -284,7 +284,6 @@ class SerializingMetricWriterTest extends DDSpecification { int statCount = unpacker.unpackArrayHeader() assert statCount == content.size() for (AggregateEntry entry : content) { - AggregateMetric value = entry.aggregate int metricMapSize = unpacker.unpackMapHeader() // Calculate expected map size based on optional fields boolean hasHttpMethod = entry.getHttpMethod() != null @@ -349,16 +348,16 @@ class SerializingMetricWriterTest extends DDSpecification { ++elementCount } assert unpacker.unpackString() == "Hits" - assert unpacker.unpackInt() == value.getHitCount() + assert unpacker.unpackInt() == entry.getHitCount() ++elementCount assert unpacker.unpackString() == "Errors" - assert unpacker.unpackInt() == value.getErrorCount() + assert unpacker.unpackInt() == entry.getErrorCount() ++elementCount assert unpacker.unpackString() == "TopLevelHits" - assert unpacker.unpackInt() == value.getTopLevelCount() + assert unpacker.unpackInt() == entry.getTopLevelCount() ++elementCount assert unpacker.unpackString() == "Duration" - assert unpacker.unpackLong() == value.getDuration() + assert unpacker.unpackLong() == entry.getDuration() ++elementCount assert unpacker.unpackString() == "OkSummary" validateSketch(unpacker) diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryRecordingTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryRecordingTest.java new file mode 100644 index 00000000000..d9f95d7967b --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryRecordingTest.java @@ -0,0 +1,105 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongArray; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateEntryRecordingTest { + + @BeforeAll + static void initAgentMeter() { + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void recordDurationsSumsToTotal() { + AggregateEntry entry = newEntry(); + entry.recordDurations(3, new AtomicLongArray(new long[] {1, 2, 3})); + assertEquals(6, entry.getDuration()); + } + + @Test + void clearResetsCounters() { + AggregateEntry entry = newEntry(); + entry.recordDurations(3, new AtomicLongArray(new long[] {5, ERROR_TAG | 6, TOP_LEVEL_TAG | 7})); + + entry.clearAggregate(); + + assertEquals(0, entry.getDuration()); + assertEquals(0, entry.getErrorCount()); + assertEquals(0, entry.getTopLevelCount()); + assertEquals(0, entry.getHitCount()); + } + + @Test + void recordOneDurationAccumulatesOkErrorAndTopLevel() { + AggregateEntry entry = newEntry(); + entry.recordOneDuration(10L); + entry.recordOneDuration(10L | TOP_LEVEL_TAG); + entry.recordOneDuration(10L | ERROR_TAG); + + assertEquals(3, entry.getHitCount()); + assertEquals(30, entry.getDuration()); + assertEquals(1, entry.getErrorCount()); + assertEquals(1, entry.getTopLevelCount()); + } + + @Test + void trailingZerosAreIncludedInHitCount() { + AggregateEntry entry = newEntry(); + entry.recordDurations(3, new AtomicLongArray(new long[] {1, 2, 3, 0, 0, 0})); + assertEquals(6, entry.getDuration()); + assertEquals(3, entry.getHitCount()); + assertEquals(0, entry.getErrorCount()); + } + + @Test + void hitCountIncludesErrors() { + AggregateEntry entry = newEntry(); + entry.recordDurations(3, new AtomicLongArray(new long[] {1, 2, 3 | ERROR_TAG})); + assertEquals(3, entry.getHitCount()); + assertEquals(1, entry.getErrorCount()); + } + + @Test + void okAndErrorLatenciesTrackedSeparately() { + AggregateEntry entry = newEntry(); + entry.recordDurations( + 10, + new AtomicLongArray( + new long[] {1, 100 | ERROR_TAG, 2, 99 | ERROR_TAG, 3, 98 | ERROR_TAG, 4, 97 | ERROR_TAG})); + + assertTrue(entry.getErrorLatencies().getMaxValue() >= 99); + assertTrue(entry.getOkLatencies().getMaxValue() <= 5); + } + + private static AggregateEntry newEntry() { + return AggregateEntry.of( + "resource", + "service", + "operation", + null, + "type", + 0, + false, + true, + "client", + Collections.emptyList(), + null, + null, + null); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 7a4f84c30dd..94e140e01dd 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -1,7 +1,7 @@ package datadog.trace.common.metrics; -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.AggregateEntry.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateEntry.TOP_LEVEL_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -25,33 +25,32 @@ class AggregateTableTest { @BeforeAll static void initAgentMeter() { - // AggregateMetric.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. - // Mirror what AggregateMetricTest does. + // AggregateEntry.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); monitoring.newTimer("test.init"); } @Test - void insertOnMissReturnsNewAggregate() { + void insertOnMissReturnsNewEntry() { AggregateTable table = new AggregateTable(8); SpanSnapshot s = snapshot("svc", "op", "client"); - AggregateMetric agg = table.findOrInsert(s); + AggregateEntry entry = table.findOrInsert(s); - assertNotNull(agg); + assertNotNull(entry); assertEquals(1, table.size()); - assertEquals(0, agg.getHitCount()); + assertEquals(0, entry.getHitCount()); } @Test - void hitReturnsSameAggregateInstance() { + void hitReturnsSameEntryInstance() { AggregateTable table = new AggregateTable(8); SpanSnapshot s1 = snapshot("svc", "op", "client"); SpanSnapshot s2 = snapshot("svc", "op", "client"); - AggregateMetric first = table.findOrInsert(s1); - AggregateMetric second = table.findOrInsert(s2); + AggregateEntry first = table.findOrInsert(s1); + AggregateEntry second = table.findOrInsert(s2); assertSame(first, second); assertEquals(1, table.size()); @@ -61,10 +60,10 @@ void hitReturnsSameAggregateInstance() { void differentKindFieldsAreDistinct() { AggregateTable table = new AggregateTable(8); - AggregateMetric clientAgg = table.findOrInsert(snapshot("svc", "op", "client")); - AggregateMetric serverAgg = table.findOrInsert(snapshot("svc", "op", "server")); + AggregateEntry clientEntry = table.findOrInsert(snapshot("svc", "op", "client")); + AggregateEntry serverEntry = table.findOrInsert(snapshot("svc", "op", "server")); - assertNotSame(clientAgg, serverAgg); + assertNotSame(clientEntry, serverEntry); assertEquals(2, table.size()); } @@ -77,9 +76,9 @@ void peerTagPairsParticipateInIdentity() { builder("svc", "op", "client").peerTags("peer.hostname", "host-b").build(); SpanSnapshot noTags = builder("svc", "op", "client").build(); - AggregateMetric a = table.findOrInsert(withTags); - AggregateMetric b = table.findOrInsert(otherTags); - AggregateMetric c = table.findOrInsert(noTags); + AggregateEntry a = table.findOrInsert(withTags); + AggregateEntry b = table.findOrInsert(otherTags); + AggregateEntry c = table.findOrInsert(noTags); assertNotSame(a, b); assertNotSame(a, c); @@ -97,9 +96,9 @@ void cardinalityBlockedValuesCollapseIntoOneEntry() { AggregateTable table = new AggregateTable(128); for (int i = 0; i < 50; i++) { - AggregateMetric agg = table.findOrInsert(snapshot("svc-" + i, "op", "client")); - assertNotNull(agg); - agg.recordOneDuration(1L); + AggregateEntry entry = table.findOrInsert(snapshot("svc-" + i, "op", "client")); + assertNotNull(entry); + entry.recordOneDuration(1L); } // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total. @@ -112,19 +111,19 @@ void cardinalityBlockedValuesCollapseIntoOneEntry() { void capOverrunEvictsStaleEntry() { AggregateTable table = new AggregateTable(2); - AggregateMetric stale = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateEntry stale = table.findOrInsert(snapshot("svc-a", "op", "client")); // do not record on stale -> hitCount stays at 0 - AggregateMetric live = table.findOrInsert(snapshot("svc-b", "op", "client")); + AggregateEntry live = table.findOrInsert(snapshot("svc-b", "op", "client")); live.recordOneDuration(10L | TOP_LEVEL_TAG); // hitCount=1, not evictable // table is full (size=2). Inserting a third should evict the stale one and succeed. - AggregateMetric newcomer = table.findOrInsert(snapshot("svc-c", "op", "client")); + AggregateEntry newcomer = table.findOrInsert(snapshot("svc-c", "op", "client")); assertNotNull(newcomer); assertEquals(2, table.size()); // re-inserting the stale snapshot should miss now (it was evicted) and produce a fresh entry - AggregateMetric staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateEntry staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client")); assertNotSame(stale, staleAgain); } @@ -132,12 +131,12 @@ void capOverrunEvictsStaleEntry() { void capOverrunWithNoStaleReturnsNull() { AggregateTable table = new AggregateTable(2); - AggregateMetric a = table.findOrInsert(snapshot("svc-a", "op", "client")); - AggregateMetric b = table.findOrInsert(snapshot("svc-b", "op", "client")); + AggregateEntry a = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateEntry b = table.findOrInsert(snapshot("svc-b", "op", "client")); a.recordOneDuration(10L); b.recordOneDuration(20L); - AggregateMetric c = table.findOrInsert(snapshot("svc-c", "op", "client")); + AggregateEntry c = table.findOrInsert(snapshot("svc-c", "op", "client")); assertNull(c); assertEquals(2, table.size()); } @@ -146,10 +145,10 @@ void capOverrunWithNoStaleReturnsNull() { void expungeStaleAggregatesRemovesZeroHitsOnly() { AggregateTable table = new AggregateTable(16); - AggregateMetric live = table.findOrInsert(snapshot("svc-live", "op", "client")); + AggregateEntry live = table.findOrInsert(snapshot("svc-live", "op", "client")); live.recordOneDuration(10L); - AggregateMetric stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client")); - AggregateMetric stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client")); + AggregateEntry stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client")); + AggregateEntry stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client")); assertEquals(3, table.size()); assertEquals(0, stale1.getHitCount()); assertEquals(0, stale2.getHitCount()); @@ -169,7 +168,7 @@ void forEachVisitsEveryEntry() { table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG); Map visited = new HashMap<>(); - table.forEach(e -> visited.put(e.getService().toString(), e.aggregate.getDuration())); + table.forEach(e -> visited.put(e.getService().toString(), e.getDuration())); assertEquals(3, visited.size()); assertEquals(1L, visited.get("a")); diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index 81a476c67c8..4883543cf68 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -39,10 +39,10 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) def entry1 = AggregateEntry.of("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) - entry1.aggregate.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) + entry1.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) writer.add(entry1) def entry2 = AggregateEntry.of("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) - entry2.aggregate.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) + entry2.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) writer.add(entry2) writer.finishBucket() From fea14769d02b08a0afb424225b596eddba76649a Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 19:19:40 -0400 Subject: [PATCH 02/11] Store peer tags as UTF8BytesString[] instead of List on AggregateEntry Each entry held a List for peer tags, which meant: - on miss, Canonical.toEntry allocated an ArrayList plus its internal Object[] -- two allocations and ~24 bytes of List wrapper overhead on top of the array itself (or a SingletonList wrapper at N=1); - on every report cycle, SerializingMetricWriter.add did a for-each over the list, allocating an ArrayList$Itr per entry. Switching the field to a flat UTF8BytesString[] eliminates the wrapper and the iterator allocation. For empty entries, a shared EMPTY_PEER_TAGS constant is used. Canonical's scratch buffer also moves from ArrayList to UTF8BytesString[] + count with on-demand growth, so populate / matches / hashOf all operate directly on the array without bounds-check method-call overhead. Benchmark unchanged (consumer-side change; producer-bound benchmark): before: 2.040 us/op after: 2.023 us/op Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 90 +++++++++++-------- .../metrics/SerializingMetricWriter.java | 10 +-- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index ce583282b9e..d6afbd9e29e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -5,8 +5,7 @@ import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicLongArray; @@ -46,6 +45,9 @@ final class AggregateEntry extends Hashtable.Entry { /** Second-from-top bit: set when the recorded span was a top-level span. */ static final long TOP_LEVEL_TAG = 0x4000000000000000L; + /** Shared empty array used by entries with no peer tags. */ + private static final UTF8BytesString[] EMPTY_PEER_TAGS = new UTF8BytesString[0]; + // Per-field cardinality limits. Identical to the prior DDCache sizes. static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32); static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32); @@ -72,7 +74,7 @@ final class AggregateEntry extends Hashtable.Entry { final short httpStatusCode; final boolean synthetic; final boolean traceRoot; - final List peerTags; + final UTF8BytesString[] peerTags; // Recording state. Mutated only on the aggregator thread. Not thread-safe. private final Histogram okLatencies; @@ -97,7 +99,7 @@ private AggregateEntry( short httpStatusCode, boolean synthetic, boolean traceRoot, - List peerTags) { + UTF8BytesString[] peerTags) { super(keyHash); this.resource = resource; this.service = service; @@ -145,7 +147,10 @@ static AggregateEntry of( UTF8BytesString httpMethodUtf = httpMethod == null ? null : createUtf8(httpMethod); UTF8BytesString httpEndpointUtf = httpEndpoint == null ? null : createUtf8(httpEndpoint); UTF8BytesString grpcUtf = grpcStatusCode == null ? null : createUtf8(grpcStatusCode); - List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags; + UTF8BytesString[] peerTagsArray = + peerTags == null || peerTags.isEmpty() + ? EMPTY_PEER_TAGS + : peerTags.toArray(new UTF8BytesString[0]); long keyHash = hashOf( resourceUtf, @@ -160,7 +165,8 @@ static AggregateEntry of( (short) httpStatusCode, synthetic, traceRoot, - peerTagsList); + peerTagsArray, + peerTagsArray.length); return new AggregateEntry( keyHash, resourceUtf, @@ -175,7 +181,7 @@ static AggregateEntry of( (short) httpStatusCode, synthetic, traceRoot, - peerTagsList); + peerTagsArray); } /** @@ -216,7 +222,8 @@ static long hashOf( short httpStatusCode, boolean synthetic, boolean traceRoot, - List peerTags) { + UTF8BytesString[] peerTags, + int peerTagsLen) { long h = 0; h = LongHashingUtils.addToHash(h, resource); h = LongHashingUtils.addToHash(h, service); @@ -227,10 +234,8 @@ static long hashOf( h = LongHashingUtils.addToHash(h, synthetic); h = LongHashingUtils.addToHash(h, traceRoot); h = LongHashingUtils.addToHash(h, spanKind); - // indexed iteration -- avoids the iterator allocation a for-each over a List would do - int peerTagCount = peerTags.size(); - for (int i = 0; i < peerTagCount; i++) { - h = LongHashingUtils.addToHash(h, peerTags.get(i)); + for (int i = 0; i < peerTagsLen; i++) { + h = LongHashingUtils.addToHash(h, peerTags[i]); } h = LongHashingUtils.addToHash(h, httpMethod); h = LongHashingUtils.addToHash(h, httpEndpoint); @@ -287,7 +292,7 @@ boolean isTraceRoot() { return traceRoot; } - List getPeerTags() { + UTF8BytesString[] getPeerTags() { return peerTags; } @@ -392,7 +397,7 @@ public boolean equals(Object o) { && Objects.equals(serviceSource, that.serviceSource) && Objects.equals(type, that.type) && Objects.equals(spanKind, that.spanKind) - && peerTags.equals(that.peerTags) + && Arrays.equals(peerTags, that.peerTags) && Objects.equals(httpMethod, that.httpMethod) && Objects.equals(httpEndpoint, that.httpEndpoint) && Objects.equals(grpcStatusCode, that.grpcStatusCode); @@ -426,11 +431,14 @@ static final class Canonical { boolean traceRoot; /** - * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link - * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own. - * Zero allocation on the hit path. + * Reusable buffer of canonicalized peer-tag UTF8 forms. Slots {@code [0..peerTagsCount)} are + * the canonicalized values for the current snapshot; the rest is uninitialized space the + * buffer holds onto across calls so we don't reallocate. {@link #toEntry} copies a + * tightly-sized array out of this buffer for the entry to own. */ - final ArrayList peerTagsBuffer = new ArrayList<>(4); + UTF8BytesString[] peerTagsBuffer = new UTF8BytesString[4]; + + int peerTagsCount; long keyHash; @@ -466,16 +474,17 @@ void populate(SpanSnapshot s) { httpStatusCode, synthetic, traceRoot, - peerTagsBuffer); + peerTagsBuffer, + peerTagsCount); } /** * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying {@code schema.handler(i)} - * to each non-null value at the same index. No allocation when the schema/values are absent or - * all values are null (buffer is just cleared). + * to each non-null value at the same index. Reuses the existing buffer; grows it only when + * the snapshot has more non-null peer-tag values than the buffer's current capacity. */ private void populatePeerTags(PeerTagSchema schema, String[] values) { - peerTagsBuffer.clear(); + peerTagsCount = 0; if (schema == null || values == null) { return; } @@ -483,7 +492,10 @@ private void populatePeerTags(PeerTagSchema schema, String[] values) { for (int i = 0; i < n; i++) { String v = values[i]; if (v != null) { - peerTagsBuffer.add(schema.handler(i).register(v)); + if (peerTagsCount == peerTagsBuffer.length) { + peerTagsBuffer = Arrays.copyOf(peerTagsBuffer, peerTagsBuffer.length * 2); + } + peerTagsBuffer[peerTagsCount++] = schema.handler(i).register(v); } } } @@ -503,20 +515,22 @@ boolean matches(AggregateEntry e) { && Objects.equals(serviceSource, e.serviceSource) && Objects.equals(type, e.type) && Objects.equals(spanKind, e.spanKind) - && peerTagsEqual(peerTagsBuffer, e.peerTags) + && peerTagsEqual(peerTagsBuffer, peerTagsCount, e.peerTags) && Objects.equals(httpMethod, e.httpMethod) && Objects.equals(httpEndpoint, e.httpEndpoint) && Objects.equals(grpcStatusCode, e.grpcStatusCode); } - /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */ - private static boolean peerTagsEqual(List a, List b) { - int n = a.size(); - if (n != b.size()) { + /** + * Compares the first {@code aLen} elements of {@code a} against all of {@code b}. Avoids the + * iterator a {@code List.equals} would allocate. + */ + private static boolean peerTagsEqual(UTF8BytesString[] a, int aLen, UTF8BytesString[] b) { + if (aLen != b.length) { return false; } - for (int i = 0; i < n; i++) { - if (!a.get(i).equals(b.get(i))) { + for (int i = 0; i < aLen; i++) { + if (!a[i].equals(b[i])) { return false; } } @@ -525,18 +539,16 @@ private static boolean peerTagsEqual(List a, List snapshottedPeerTags; - int n = peerTagsBuffer.size(); - if (n == 0) { - snapshottedPeerTags = Collections.emptyList(); - } else if (n == 1) { - snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0)); + UTF8BytesString[] snapshottedPeerTags; + if (peerTagsCount == 0) { + snapshottedPeerTags = EMPTY_PEER_TAGS; } else { - snapshottedPeerTags = new ArrayList<>(peerTagsBuffer); + snapshottedPeerTags = new UTF8BytesString[peerTagsCount]; + System.arraycopy(peerTagsBuffer, 0, snapshottedPeerTags, 0, peerTagsCount); } return new AggregateEntry( keyHash, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 7644ebaf044..ff4944e38a5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -13,7 +13,6 @@ import datadog.trace.api.git.GitInfo; import datadog.trace.api.git.GitInfoProvider; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import java.util.List; import java.util.function.Function; public final class SerializingMetricWriter implements MetricWriter { @@ -182,11 +181,10 @@ public void add(AggregateEntry entry) { writer.writeUTF8(entry.getSpanKind()); writer.writeUTF8(PEER_TAGS); - final List peerTags = entry.getPeerTags(); - writer.startArray(peerTags.size()); - - for (UTF8BytesString peerTag : peerTags) { - writer.writeUTF8(peerTag); + final UTF8BytesString[] peerTags = entry.getPeerTags(); + writer.startArray(peerTags.length); + for (int i = 0; i < peerTags.length; i++) { + writer.writeUTF8(peerTags[i]); } if (hasServiceSource) { From 4d291286eb48809ce0fd7308fe851eca67038f76 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 19:35:14 -0400 Subject: [PATCH 03/11] Add miss-path JMH benchmark for ClientStatsAggregator The existing hit-path benchmarks publish the same 64 spans every op, so after warmup every lookup is a cache hit and the consumer-side allocations (AggregateEntry construction, peer-tag array, etc.) never fire. That hides the impact of the entry/peer-tag memory work. This variant builds a pool of 4096 single-span traces with unique (service, operation, resource) tuples and publishes one per op, cycling. Steady state exercises canonicalize + miss + insert + the blocked_by_tracer fallback once cardinality budgets fill. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...lientStatsAggregatorMissPathBenchmark.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java new file mode 100644 index 00000000000..3e6bb5adccb --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java @@ -0,0 +1,83 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Miss-path variant of {@link ClientStatsAggregatorBenchmark}. Each op publishes a single-span + * trace from a pre-built pool where every span has a unique (service, operation, resource) tuple. + * After cardinality budgets fill, fields canonicalize to the {@code blocked_by_tracer} sentinel, + * but the producer still allocates a {@link SpanSnapshot} per op and enqueues it for the + * aggregator -- so the steady state exercises the per-op publish allocations + the consumer's + * canonicalize/match work, not the hit-path-only pattern of the other benchmarks. + * + *

Run with {@code -prof gc} to compare allocation rates against master's + * {@code ConflatingMetricsAggregator}. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 3, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ClientStatsAggregatorMissPathBenchmark { + + private static final int POOL_SIZE = 4096; + + private final DDAgentFeaturesDiscovery featuresDiscovery = + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ClientStatsAggregator aggregator = + new ClientStatsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + HealthMetrics.NO_OP, + new ClientStatsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + + private final List>> pool = generatePool(POOL_SIZE); + private int cursor; + + static List>> generatePool(int n) { + List>> out = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + SimpleSpan span = + new SimpleSpan( + "svc-" + i, "op-" + i, "res-" + i, "type-" + (i & 7), true, true, false, 0, 10, -1); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", "host-" + i); + out.add(Collections.singletonList(span)); + } + return out; + } + + @Benchmark + public void benchmark(Blackhole blackhole) { + int idx = cursor; + cursor = (idx + 1) % POOL_SIZE; + blackhole.consume(aggregator.publish(pool.get(idx))); + } +} From 8cf1bf18408bdd12cb70112e59c74f21d64b9dc2 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 19:52:50 -0400 Subject: [PATCH 04/11] Back cardinality handlers with flat open-addressed tables Property/TagCardinalityHandler each held a pre-sized HashMap<..., UTF8BytesString>. Every distinct value cost a HashMap$Node (~40 bytes: header + hash + 3 refs + alignment). For the 9 property handlers (caps 8-64) plus N tag handlers (cap 512 each), that's a lot of Node objects surviving each reporting interval. Swap for two parallel arrays (CharSequence/String keys + UTF8BytesString values) sized to the next power of two >= 2 * cardinalityLimit, with linear probing. No per-entry object allocation; reset is just two Arrays.fill calls. Semantics preserved exactly: - size-vs-cap check still runs before lookup, so values registered earlier in the cycle also collapse to the sentinel once the cap is reached; - reset drops every entry (callers must obtain fresh references after reset); - cacheBlocked survives reset, keeping the sentinel instance stable. Memory comparison at full cardinality (property handler, cap 64): HashMap: 48 (obj) + 528 (table[128]) + 64*40 (Nodes) = 3136 B Flat: 40 (obj) + 2*528 (two tables) = 1096 B For a tag handler at cap 512: HashMap: 4160 + 512*40 = 24640 B Flat: 40 + 2*4112 = 8264 B (saves ~16 KB per tag handler) Trade-off: empty handlers are slightly larger (flat keeps both arrays prealloc'd; HashMap only has the bucket table). Crossover is ~13 entries -- typical steady-state for any handler in production. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../metrics/PropertyCardinalityHandler.java | 88 ++++++++++++++----- .../common/metrics/TagCardinalityHandler.java | 75 +++++++++++----- 2 files changed, 118 insertions(+), 45 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java index 61560a32a71..4e770ea9ff5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java @@ -1,45 +1,87 @@ package datadog.trace.common.metrics; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import java.util.HashMap; +import java.util.Arrays; +/** + * Interns raw label values into {@link UTF8BytesString}s, capped at {@code cardinalityLimit} + * distinct values per reporting cycle. Once the cap is hit, every {@link #register} call returns a + * shared {@code blocked_by_tracer} sentinel -- including for values that were registered before + * the cap was reached -- so the consumer's hash key collapses to one bucket for all overflow. + * + *

Backed by a flat open-addressed table sized to {@code 2 * cardinalityLimit} (rounded up to a + * power of two). Avoids the {@code HashMap$Node} allocations of the previous implementation; reset + * just nulls the arrays. Not thread-safe -- only the aggregator thread calls these. + */ public final class PropertyCardinalityHandler { - private final int cardinalityLimit; - private final HashMap curUtf8s; + private final int cardinalityLimit; + private final CharSequence[] keys; + private final UTF8BytesString[] values; + private final int mask; - private UTF8BytesString cacheBlocked = null; + private int size; + private UTF8BytesString cacheBlocked; public PropertyCardinalityHandler(int cardinalityLimit) { this.cardinalityLimit = cardinalityLimit; - - // pre-sizing properly to avoid rehashing - this.curUtf8s = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + int cap = tableSizeFor(cardinalityLimit); + this.keys = new CharSequence[cap]; + this.values = new UTF8BytesString[cap]; + this.mask = cap - 1; } + /** + * Returns the canonical UTF8 form for {@code value}, or the {@code blocked_by_tracer} sentinel + * when the budget is exhausted. The cap check runs before the lookup, so values that were + * registered earlier in this cycle also collapse to the sentinel once the cap is hit. + */ public UTF8BytesString register(CharSequence value) { - if (this.curUtf8s.size() >= this.cardinalityLimit) { - return this.blockedByTracer(); + if (size >= cardinalityLimit) { + return blockedByTracer(); + } + int i = value.hashCode() & mask; + while (true) { + CharSequence k = keys[i]; + if (k == null) { + UTF8BytesString newUtf8 = UTF8BytesString.create(value); + keys[i] = value; + values[i] = newUtf8; + size++; + return newUtf8; + } + if (k.equals(value)) { + return values[i]; + } + i = (i + 1) & mask; } - - UTF8BytesString existingUtf8 = this.curUtf8s.get(value); - if (existingUtf8 != null) return existingUtf8; - - // TODO: maybe use a fallback cache to reduce allocations across reset cycles - UTF8BytesString newUtf8 = UTF8BytesString.create(value); - this.curUtf8s.put(value, newUtf8); - return newUtf8; } private UTF8BytesString blockedByTracer() { - UTF8BytesString cacheBlocked = this.cacheBlocked; - if (cacheBlocked != null) return cacheBlocked; - - this.cacheBlocked = cacheBlocked = UTF8BytesString.create("blocked_by_tracer"); - return cacheBlocked; + UTF8BytesString cached = this.cacheBlocked; + if (cached != null) { + return cached; + } + cached = UTF8BytesString.create("blocked_by_tracer"); + this.cacheBlocked = cached; + return cached; } + /** + * Drops every value registered this cycle, refreshing the budget. The {@code blocked_by_tracer} + * sentinel survives, so previously-issued sentinel references stay equal to fresh ones. + */ public void reset() { - this.curUtf8s.clear(); + if (size > 0) { + Arrays.fill(keys, null); + Arrays.fill(values, null); + size = 0; + } + } + + /** Power-of-two table size with at most ~50% load factor at full cardinality. */ + private static int tableSizeFor(int cardinalityLimit) { + int target = Math.max(cardinalityLimit * 2, 16); + return Integer.highestOneBit(target - 1) << 1; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java index 1fdfed5c7c4..ea11151c1f8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java @@ -1,46 +1,77 @@ package datadog.trace.common.metrics; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import java.util.HashMap; +import java.util.Arrays; +/** + * Interns {@code tag:value} pairs into {@link UTF8BytesString}s, capped at {@code + * cardinalityLimit} distinct values per reporting cycle. Once the cap is hit, every {@link + * #register} call returns a shared {@code tag:blocked_by_tracer} sentinel. + * + *

Backed by a flat open-addressed table sized to {@code 2 * cardinalityLimit} (rounded up to a + * power of two). Not thread-safe -- only the aggregator thread calls these. + */ public final class TagCardinalityHandler { + private final String tag; private final int cardinalityLimit; + private final String[] keys; + private final UTF8BytesString[] values; + private final int mask; - private final HashMap curUtf8Pairs; - - private UTF8BytesString cacheBlocked = null; + private int size; + private UTF8BytesString cacheBlocked; public TagCardinalityHandler(String tag, int cardinalityLimit) { this.tag = tag; this.cardinalityLimit = cardinalityLimit; - - // pre-sizing properly to avoid rehashing - this.curUtf8Pairs = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + int cap = tableSizeFor(cardinalityLimit); + this.keys = new String[cap]; + this.values = new UTF8BytesString[cap]; + this.mask = cap - 1; } public UTF8BytesString register(String value) { - if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { - return this.blockedByTracer(); + if (size >= cardinalityLimit) { + return blockedByTracer(); + } + int i = value.hashCode() & mask; + while (true) { + String k = keys[i]; + if (k == null) { + UTF8BytesString newPair = UTF8BytesString.create(tag + ":" + value); + keys[i] = value; + values[i] = newPair; + size++; + return newPair; + } + if (k.equals(value)) { + return values[i]; + } + i = (i + 1) & mask; } - - UTF8BytesString existing = this.curUtf8Pairs.get(value); - if (existing != null) return existing; - - UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + value); - this.curUtf8Pairs.put(value, newPair); - return newPair; } private UTF8BytesString blockedByTracer() { - UTF8BytesString cacheBlocked = this.cacheBlocked; - if (cacheBlocked != null) return cacheBlocked; - - this.cacheBlocked = cacheBlocked = UTF8BytesString.create(this.tag + ":blocked_by_tracer"); - return cacheBlocked; + UTF8BytesString cached = this.cacheBlocked; + if (cached != null) { + return cached; + } + cached = UTF8BytesString.create(tag + ":blocked_by_tracer"); + this.cacheBlocked = cached; + return cached; } public void reset() { - this.curUtf8Pairs.clear(); + if (size > 0) { + Arrays.fill(keys, null); + Arrays.fill(values, null); + size = 0; + } + } + + private static int tableSizeFor(int cardinalityLimit) { + int target = Math.max(cardinalityLimit * 2, 16); + return Integer.highestOneBit(target - 1) << 1; } } From ae91c66233950b56f23b54d3f77a4eace923e92e Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 20:20:23 -0400 Subject: [PATCH 05/11] Lazy-allocate the error latency histogram on AggregateEntry Each AggregateEntry allocated two DDSketchHistograms in its constructor (ok + error latencies). DDSketchHistogram wraps a DDSketch + lazy store, roughly 60-80 bytes per histogram even when empty. Most spans aren't errors, so most entries' errorLatencies sit empty for life. Now the field starts null. recordOneDuration / recordDurations lazy-allocate on the first error; if no error ever lands on the entry, it stays null and ~80 bytes of empty-histogram overhead are reclaimed. Across a full 2048-entry table that's ~150 KB if 95% of entries never error -- the typical case. For the wire format, SerializingMetricWriter caches the serialized form of an empty histogram (~17 bytes) on first use and writes those cached bytes when an entry's errorLatencies is null. The cache is per-writer (not a global static) so each writer instance picks up the Histograms factory state at the time of its first report, avoiding races with test setup that registers the DDSketch factory at varying points. Trade-off: entries that DO see an error retain the histogram across clearAggregate (just cleared, not nulled), so always-erroring entries allocate exactly once. Same total allocation as before for that case. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 46 ++++++++++++++++--- .../metrics/SerializingMetricWriter.java | 27 ++++++++++- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index d6afbd9e29e..02bf00f1617 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -78,7 +78,15 @@ final class AggregateEntry extends Hashtable.Entry { // Recording state. Mutated only on the aggregator thread. Not thread-safe. private final Histogram okLatencies; - private final Histogram errorLatencies; + + /** + * Lazily allocated on the first recorded error. Most entries never see an error, so they keep + * this null forever; {@link #getErrorLatencies()} returns a shared empty histogram in that + * case. Once allocated, it survives {@link #clearAggregate()} (just cleared, not nulled) since + * an entry that errored once tends to error again. + */ + private Histogram errorLatencies; + private int errorCount; private int hitCount; private int topLevelCount; @@ -115,7 +123,6 @@ private AggregateEntry( this.traceRoot = traceRoot; this.peerTags = peerTags; this.okLatencies = Histogram.newHistogram(); - this.errorLatencies = Histogram.newHistogram(); } /** @@ -318,10 +325,25 @@ Histogram getOkLatencies() { return okLatencies; } + /** + * Returns the entry's error latency histogram, or {@code null} if no error has been recorded + * yet. Callers should treat null as "serialize as an empty histogram" (see {@link + * SerializingMetricWriter}). + */ Histogram getErrorLatencies() { return errorLatencies; } + /** Lazy-allocates {@link #errorLatencies} on the first error. */ + private Histogram errorLatenciesForWrite() { + Histogram h = errorLatencies; + if (h == null) { + h = Histogram.newHistogram(); + errorLatencies = h; + } + return h; + } + /** * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. @@ -334,7 +356,7 @@ AggregateEntry recordOneDuration(long tagAndDuration) { } if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { tagAndDuration ^= ERROR_TAG; - errorLatencies.accept(tagAndDuration); + errorLatenciesForWrite().accept(tagAndDuration); ++errorCount; } else { okLatencies.accept(tagAndDuration); @@ -357,7 +379,7 @@ AggregateEntry recordDurations(int count, AtomicLongArray durations) { } if ((d & ERROR_TAG) == ERROR_TAG) { d ^= ERROR_TAG; - errorLatencies.accept(d); + errorLatenciesForWrite().accept(d); ++errorCount; } else { okLatencies.accept(d); @@ -367,7 +389,11 @@ AggregateEntry recordDurations(int count, AtomicLongArray durations) { return this; } - /** Clears the recording state. Histograms are reused. */ + /** + * Clears the recording state. The OK histogram is reused; the error histogram (if allocated) + * is reused too, but entries that never saw an error keep their {@code errorLatencies} field + * null. + */ @SuppressFBWarnings("AT_NONATOMIC_64BIT_PRIMITIVE") void clearAggregate() { this.errorCount = 0; @@ -375,9 +401,17 @@ void clearAggregate() { this.topLevelCount = 0; this.duration = 0; this.okLatencies.clear(); - this.errorLatencies.clear(); + if (this.errorLatencies != null) { + this.errorLatencies.clear(); + } } + /** + * Holder for the lazy-initialized shared empty error histogram. Class init runs only when + * {@link #getErrorLatencies()} is first called on an entry with no recorded errors, by which + * time the {@code AgentMeter} that {@link Histogram#newHistogram()} depends on is set up. + */ + /** * Equality on the 13 label fields (not on the recording counters). Used only by test mock * matchers; the {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index ff4944e38a5..ac0a6214313 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -225,7 +225,32 @@ public void add(AggregateEntry entry) { writer.writeBinary(entry.getOkLatencies().serialize()); writer.writeUTF8(ERROR_SUMMARY); - writer.writeBinary(entry.getErrorLatencies().serialize()); + final datadog.metrics.api.Histogram errorLatencies = entry.getErrorLatencies(); + if (errorLatencies != null) { + writer.writeBinary(errorLatencies.serialize()); + } else { + // Entry never saw an error; emit a cached empty-histogram payload so the wire format is + // unchanged without allocating a histogram per entry. + writer.writeBinary(emptyErrorHistogramBytes()); + } + } + + private byte[] emptyHistogramBytesCache; + + /** + * Returns the cached serialized form of an empty histogram. Computed lazily on first call so the + * {@link datadog.metrics.api.Histograms} factory has been registered by the consumer thread (or + * test setup) before we sample its output. + */ + private byte[] emptyErrorHistogramBytes() { + byte[] cached = emptyHistogramBytesCache; + if (cached == null) { + java.nio.ByteBuffer buf = datadog.metrics.api.Histogram.newHistogram().serialize(); + cached = new byte[buf.remaining()]; + buf.get(cached); + emptyHistogramBytesCache = cached; + } + return cached; } @Override From a8cae22c13557106aa0ad9f0814265913915ef67 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 20:29:38 -0400 Subject: [PATCH 06/11] Raise per-field cardinality limits to match real workloads The previous limits (RESOURCE=32, OPERATION=64, TYPE=8, ...) were inherited from the prior DDCache sizes and were chosen for memory conservation, not to cover the cardinality typical APM workloads actually produce. A single REST API with 200 routes exhausted the RESOURCE budget within seconds and collapsed everything onto the blocked_by_tracer sentinel. Recalibrated against realistic distinct-value counts per 10-second reporting window: RESOURCE 32 -> 512 (route + SQL template count for web apps) HTTP_ENDPOINT 32 -> 256 (parameterized routes) OPERATION 64 -> 128 (one per integration kind) SERVICE 32 -> 128 (microservice / peer-service hub) TYPE 8 -> 32 (DDSpanTypes has ~30 known values) HTTP_METHOD 8 -> 16 (9 standard verbs + custom) GRPC_STATUS_CODE 32 -> 24 (17 gRPC codes; tightened) SERVICE_SOURCE 16 -> 16 (unchanged) SPAN_KIND 16 -> 16 (unchanged) Memory cost with the flat handler tables: ~3.8 KB -> ~19.6 KB across all 9 handlers. Negligible relative to the maxAggregates=2048 entry table. Benchmark numbers unchanged (the limits don't show up in producer allocation or the existing hit-path benchmarks): ClientStatsAggregatorBenchmark 3.632 us/op (was 3.555) ClientStatsAggregatorDDSpanBenchmark 2.027 us/op (was 1.967) ClientStatsAggregatorMissPathBenchmark 0.057 us/op, 96 B/op (same) Also updates cardinalityBlockedValuesCollapseIntoOneEntry to push the test beyond the new SERVICE cap (150 services -> 128 in-budget + 1 sentinel = 129 entries). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 18 ++++++++++-------- .../common/metrics/AggregateTableTest.java | 16 ++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 02bf00f1617..587a92140d2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -48,19 +48,21 @@ final class AggregateEntry extends Hashtable.Entry { /** Shared empty array used by entries with no peer tags. */ private static final UTF8BytesString[] EMPTY_PEER_TAGS = new UTF8BytesString[0]; - // Per-field cardinality limits. Identical to the prior DDCache sizes. - static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32); - static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32); - static final PropertyCardinalityHandler OPERATION_HANDLER = new PropertyCardinalityHandler(64); + // Per-field cardinality limits. Sized to cover typical real-world cardinality without hitting + // the blocked_by_tracer sentinel: route counts for RESOURCE/HTTP_ENDPOINT, integration counts for + // OPERATION, mesh peers for SERVICE, the known enum cardinality for the small-domain fields. + static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(512); + static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(128); + static final PropertyCardinalityHandler OPERATION_HANDLER = new PropertyCardinalityHandler(128); static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER = new PropertyCardinalityHandler(16); - static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(32); static final PropertyCardinalityHandler SPAN_KIND_HANDLER = new PropertyCardinalityHandler(16); - static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = new PropertyCardinalityHandler(16); static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER = - new PropertyCardinalityHandler(32); + new PropertyCardinalityHandler(256); static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = - new PropertyCardinalityHandler(32); + new PropertyCardinalityHandler(24); final UTF8BytesString resource; final UTF8BytesString service; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 94e140e01dd..65ada22f71a 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -88,21 +88,21 @@ void peerTagPairsParticipateInIdentity() { @Test void cardinalityBlockedValuesCollapseIntoOneEntry() { - // SERVICE_HANDLER has a cardinality limit of 32. With 50 distinct service names, services 33+ - // canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the canonical - // (post-handler) form, all blocked services land in the same bucket and merge into a single - // entry rather than fragmenting. + // SERVICE_HANDLER has a cardinality limit of 128. With 150 distinct service names, services + // 129+ canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the + // canonical (post-handler) form, all blocked services land in the same bucket and merge into + // a single entry rather than fragmenting. AggregateEntry.resetCardinalityHandlers(); - AggregateTable table = new AggregateTable(128); + AggregateTable table = new AggregateTable(256); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 150; i++) { AggregateEntry entry = table.findOrInsert(snapshot("svc-" + i, "op", "client")); assertNotNull(entry); entry.recordOneDuration(1L); } - // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total. - assertEquals(33, table.size()); + // 128 in-budget services + 1 collapsed "blocked_by_tracer" entry = 129 total. + assertEquals(129, table.size()); AggregateEntry.resetCardinalityHandlers(); } From 890075b93ba496a7dc5b0449af071c15aadc9c0f Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 21:00:29 -0400 Subject: [PATCH 07/11] Add end-to-end JMH benchmark for the trace pipeline Existing JMH benchmarks cover stages of the trace pipeline in isolation -- span creation (CoreTracerBenchmark), scope activation (ScopeLifecycleBenchmark), metrics aggregation (ClientStatsAggregator*Benchmark), wire serialization (TraceMapperBenchmark), propagation -- but nothing exercises a full startSpan -> setTag -> finish -> trace assembly -> metrics publish trip in one op. This benchmark fills that gap. It builds a real CoreTracer with a NoopWriter, reflectively swaps in a real ClientStatsAggregator (default tracer init leaves it as the no-op until agent discovery runs), then per @Benchmark op runs the full single-span lifecycle. Two modes: - "stable" -- identical labels each op (consumer-side hit path) - "varied" -- unique (service, operation, resource) per op (consumer-side miss path until cardinality budgets fill) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../metrics/TracePipelineBenchmark.java | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java new file mode 100644 index 00000000000..a1b7217485d --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java @@ -0,0 +1,176 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.common.writer.Writer; +import datadog.trace.core.CoreTracer; +import datadog.trace.core.DDSpan; +import datadog.trace.core.monitor.HealthMetrics; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * End-to-end JMH benchmark of a 3-span HTTP-style trace through {@link CoreTracer}: one {@code + * span.kind=server} root + two {@code span.kind=client} children, as if a service handled an + * incoming request that made two outbound HTTP calls. Children inherit the server span as parent + * via implicit scope-based parentage; the root finishes last so {@code PendingTrace.write} -> + * {@code tracer.write(trace)} -> metricsAggregator.publish + writer.write (no-op) runs + * synchronously on the producing thread. + * + *

Runs multi-threaded ({@link Threads} = 8 by default; override with {@code -t N}) so the + * allocation rate {@code -prof gc} reports reflects multiple producers hitting the shared + * metrics aggregator + writer pipeline, and so we can compare total throughput between revisions. + * + *

Reflection is used to swap the tracer's default no-op {@code metricsAggregator} for a real + * {@link ClientStatsAggregator} so the metrics pipeline actually runs. + * + *

Two modes via {@code @Param}: + * + *

+ */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 2) +public class TracePipelineBenchmark { + + @Param({"stable", "varied"}) + String mode; + + private CoreTracer tracer; + private ClientStatsAggregator aggregator; + private boolean stable; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() throws Exception { + this.stable = "stable".equals(mode); + this.tracer = CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build(); + this.aggregator = + new ClientStatsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + HealthMetrics.NO_OP, + new ClientStatsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + // Replace the no-op aggregator the tracer was constructed with. The field is package-private + // in datadog.trace.core; reflect since this benchmark lives in the metrics package. + Field f = CoreTracer.class.getDeclaredField("metricsAggregator"); + f.setAccessible(true); + f.set(this.tracer, this.aggregator); + } + + @TearDown + public void tearDown() { + aggregator.close(); + tracer.close(); + } + + @Benchmark + public void threeSpanTrace(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + String service = stable ? "svc" : "svc-" + idx; + String serverOp = stable ? "servlet.request" : "servlet.request-" + idx; + String serverResource = stable ? "GET /widgets/{id}" : "GET /widgets/" + idx; + String clientOp = stable ? "http.request" : "http.request-" + idx; + String clientResource1 = stable ? "GET /downstream-a" : "GET /downstream-a/" + idx; + String clientResource2 = stable ? "GET /downstream-b" : "GET /downstream-b/" + idx; + String hostA = stable ? "host-a" : "host-a-" + idx; + String hostB = stable ? "host-b" : "host-b-" + idx; + + AgentSpan server = tracer.startSpan("servlet", serverOp); + server.setResourceName(serverResource); + server.setServiceName(service); + server.setTag(SPAN_KIND, SPAN_KIND_SERVER); + AgentScope serverScope = tracer.activateSpan(server); + try { + AgentSpan client1 = tracer.startSpan("okhttp", clientOp); + client1.setResourceName(clientResource1); + client1.setServiceName(service); + client1.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + client1.setTag("peer.hostname", hostA); + AgentScope client1Scope = tracer.activateSpan(client1); + try { + // simulated unit of in-call work would go here + } finally { + client1Scope.close(); + } + client1.finish(); + + AgentSpan client2 = tracer.startSpan("okhttp", clientOp); + client2.setResourceName(clientResource2); + client2.setServiceName(service); + client2.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + client2.setTag("peer.hostname", hostB); + AgentScope client2Scope = tracer.activateSpan(client2); + try { + // simulated unit of in-call work would go here + } finally { + client2Scope.close(); + } + client2.finish(); + } finally { + serverScope.close(); + } + // Finishing the root last triggers PendingTrace.write -> tracer.write -> metrics + writer on + // this thread, since all child refs have already decremented to zero. + server.finish(); + blackhole.consume(server); + } + + private static final class NoopWriter implements Writer { + @Override + public void write(List trace) {} + + @Override + public void start() {} + + @Override + public boolean flush() { + return true; + } + + @Override + public void close() {} + + @Override + public void incrementDropCounts(int spanCount) {} + } +} From cf24265f4e5315714bfced1fddc32e8e654f016c Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 21:48:19 -0400 Subject: [PATCH 08/11] Add adversarial JMH benchmark targeting the metrics subsystem The metrics aggregator is bounded by design at every layer (inbox cap, aggregate-table cap, per-field cardinality cap, fixed-size flat handler tables, bounded histogram dense stores). This benchmark stresses all of those simultaneously to verify the bounds hold and to measure throughput + allocation under attack: - 8 producer threads hammering publish() concurrently. - Unique (service, operation, resource, peer.hostname) per op so handlers saturate to blocked_by_tracer instantly and the aggregate table fills+evicts continuously. - Random durations across a 1ns..1s range so histograms accept many distinct bins. - Random error / topLevel flags so both histograms exercise. Per-fork tearDown prints the CountingHealthMetrics drop counters so we can see how the subsystem absorbed the burst: snapshots dropped at the inbox (backpressure when consumer can't keep up) versus snapshots dropped at the table (cap reached with no stale entry). The expected shape under attack is "lots of inbox drops, zero table drops" -- producer-driven backpressure stays ahead of table saturation. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../metrics/AdversarialMetricsBenchmark.java | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java new file mode 100644 index 00000000000..bd67634bae6 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java @@ -0,0 +1,167 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Adversarial JMH benchmark designed to stress every cardinality + capacity dimension of the + * metrics subsystem at once. + * + *

The metrics aggregator is supposed to be bounded by design: + * + *

    + *
  • {@link AggregateTable} caps total entries at {@code tracerMetricsMaxAggregates} (default + * 2048) and rejects further inserts when full. + *
  • Each cardinality handler caps distinct values per reporting cycle; overflow collapses to + * {@code blocked_by_tracer}. + *
  • The producer/consumer inbox is a fixed-size MPSC queue ({@code tracerMetricsMaxPending}, + * default 2048); when full, producer {@code offer} returns false and the snapshot is + * dropped via {@link HealthMetrics#onStatsInboxFull()}. + *
  • Histograms use {@code CollapsingLowestDenseStore(1024)} -- bounded per-histogram memory. + *
  • Cardinality handlers are flat open-addressed tables of fixed capacity -- no allocation + * on the producer thread; allocation only on the consumer (handler reset clears, doesn't + * reallocate). + *
+ * + *

This benchmark hammers all of those bounds simultaneously with 8 producer threads, unique + * labels per op (so handlers cap and the table fills+evicts repeatedly), random durations across + * a wide range (so histograms accept many distinct bins), and random {@code error}/{@code + * topLevel} flags (so both histograms are exercised). After the run, prints the drop counters so + * you can verify the subsystem stayed bounded under attack. + * + *

What "OOM the metrics subsystem" looks like if the bounds break: producer-thread allocation + * would grow unbounded (snapshots faster than inbox can drain produces dropped snapshots, not + * heap growth); aggregator-thread heap would grow if entries weren't capped, if handlers grew + * past their cap, or if histograms grew past their dense-store limit. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 1) +public class AdversarialMetricsBenchmark { + + private ClientStatsAggregator aggregator; + private CountingHealthMetrics health; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() { + this.health = new CountingHealthMetrics(); + this.aggregator = + new ClientStatsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + this.health, + new ClientStatsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + } + + @TearDown + public void tearDown() { + aggregator.close(); + System.err.println( + "[ADVERSARIAL] snapshots offered (across all threads, both forks combined for this run):"); + System.err.println( + " onStatsInboxFull = " + + health.inboxFull + + " (snapshots dropped because the MPSC inbox was full)"); + System.err.println( + " onStatsAggregateDropped = " + + health.aggregateDropped + + " (snapshots dropped because the AggregateTable was full with no stale entry)"); + System.err.println( + " onClientStatTraceComputed total = " + + health.traceComputedCalls + + " spans counted = " + + health.totalSpansCounted); + } + + @Benchmark + public void publish(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + // Mix indices so labels don't fall into linear order in the handler tables. Distinct labels + // exceed every cap (RESOURCE=512, OPERATION=128, SERVICE=128, peer.hostname=512), so handlers + // saturate fast and most ops resolve to the blocked-by-tracer sentinel. + int scrambled = idx * 0x9E3779B1; // golden ratio multiplier + String service = "svc-" + (scrambled & 0xFFFF); + String operation = "op-" + ((scrambled >>> 8) & 0x3FFFF); + String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF); + String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF); + boolean error = (idx & 7) == 0; + boolean topLevel = (idx & 3) == 0; + // Wide duration spread forces histogram bins to populate broadly. + long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); // 1 ns .. ~1.07 s + + SimpleSpan span = + new SimpleSpan(service, operation, resource, "web", true, topLevel, error, 0, durationNanos, 200); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", hostname); + + List> trace = Collections.singletonList(span); + blackhole.consume(aggregator.publish(trace)); + } + + /** + * Counts what gets dropped. The aggregator publishes onto these counters from many threads, so + * the fields are {@code volatile long} with non-atomic increments -- precise counts aren't the + * point, order-of-magnitude is. + */ + static final class CountingHealthMetrics extends HealthMetrics { + volatile long inboxFull; + volatile long aggregateDropped; + volatile long traceComputedCalls; + volatile long totalSpansCounted; + + @Override + public void onStatsInboxFull() { + inboxFull++; + } + + @Override + public void onStatsAggregateDropped() { + aggregateDropped++; + } + + @Override + public void onClientStatTraceComputed(int counted, int total, boolean dropped) { + traceComputedCalls++; + totalSpansCounted += counted; + } + } + +} From 823a5d4d110a769d1b5a7af45acabb05736920c2 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 22:02:16 -0400 Subject: [PATCH 09/11] Document adversarial-case behavior in client metrics design doc Adds a new "Behavior under adversarial load" section comparing the new ClientStatsAggregator producer/consumer split against master's ConflatingMetricsAggregator under high-cardinality + random-duration attack. Captures the measured numbers from AdversarialMetricsBenchmark: - master degrades 73% by iteration 3 (1.5M -> 410K ops/s) with the GC threads consuming >100% of wall-clock budget; - this design sustains ~5.85M ops/s steady-state with ~10% GC time; - 139M snapshots dropped via the inbox-full backpressure counter, zero dropped at the AggregateTable cap; - worst-case heap ceiling of ~32 MB regardless of attack rate. Explains why master degrades (no producer/consumer split -> all canonicalization allocation on the calling thread -> GC death spiral) and why this design holds (offer-based backpressure at the MPSC inbox boundary turns overflow into a counted drop rather than heap growth). Also updates two stale references in the doc: - AggregateMetric -> "AggregateEntry's per-bucket counters" (fold); - benchmark summary now lists all four JMH benchmarks and includes the miss-path numbers (5.3x faster, 4.2x less producer allocation). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/client_metrics_design.md | 100 ++++++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md index 489763fd413..fd5d9d93546 100644 --- a/docs/client_metrics_design.md +++ b/docs/client_metrics_design.md @@ -223,8 +223,8 @@ Two distinct cadences: ## Memory and lifetime -- `AggregateMetric` is **not thread-safe**. It is mutated only by the - aggregator thread. +- `AggregateEntry`'s per-bucket counters + histograms are **not thread-safe**; + they are mutated only by the aggregator thread. - `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`, schedule-driven `REPORT`, drainer-driven inserts) route through the inbox. - `Canonical` and the cardinality handlers are aggregator-thread-only. @@ -258,6 +258,81 @@ The producer reports per-trace stats via `HealthMetrics`: | Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. | | Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. | +## Behavior under adversarial load + +A useful stress test (captured as `AdversarialMetricsBenchmark`): 8 producer +threads call `publish` in a tight loop with **unique** `(service, operation, +resource, peer.hostname)` per op, random durations across 30 orders of +magnitude, and random `error` / `topLevel` flags. Every cardinality dimension +saturates within milliseconds. + +### What "OOM the metrics subsystem" would look like + +A successful attack would either grow the aggregator's heap unboundedly, or +back up the producer so a synchronous structure (cache, map) grew with each +unique label combination. The current shape rules both out by construction: + +- **Inbox is a fixed-size MPSC queue.** Overflow returns `false` from + `offer` and the producer drops the snapshot via `onStatsInboxFull`. + The snapshot becomes garbage immediately; no queue growth. +- **`AggregateTable` is a fixed-size bucket array.** Insertion when the + table is full triggers an evict-stale pass (one entry with + `hitCount == 0`); if that fails the snapshot is dropped via + `onStatsAggregateDropped`. The table never resizes. +- **Cardinality handlers are flat open-addressed arrays.** Overflow values + canonicalize to the shared `blocked_by_tracer` sentinel — same hash, + same bucket, merged in. No node allocations, no rehash. +- **Histograms use `CollapsingLowestDenseStore(1024)`.** Bucket array + caps at ~8 KB per histogram. Worst case at full table cap: 2048 entries + × 2 histograms × ~8 KB ≈ 32 MB. That's the headline upper bound. +- **Empty error histograms aren't allocated until first error + recorded.** Entries that never error keep `errorLatencies = null`, + saving the wrapper allocation. + +### Measured behavior (1f × 1wi × 3i × 15s, 8 threads each side) + +| | master (`ConflatingMetricsAggregator`) | this design (`ClientStatsAggregator`) | +|---|---:|---:| +| Iteration 1 throughput | 1,506,007 ops/s | ~5,853,917 ops/s | +| Iteration 2 throughput | 1,255,258 ops/s | ~5,800,000 ops/s | +| Iteration 3 throughput | **410,097 ops/s** (-73%) | ~5,853,917 ops/s (stable) | +| GC time / 15 s wall | iter 1: 8.7 s — iter 2: 9.8 s — iter 3: **18.6 s** (multi-thread GC saturation) | ~150 ms total | +| Producer allocation | ~1,108 B/op | ~823 B/op | +| Aggregator thread state at end | "Skipped metrics reporting because the queue is full" + thread idle waiting for inbox | Continuously draining; ~13M snapshots/sec consumed | +| Inbox-full drops | (no counter on master) | ~139 M dropped over 45 s, all reported via `onStatsInboxFull` | +| Aggregate-table drops | 0 | 0 | + +### Why master degrades + +On master, the producer does **everything** synchronously on the calling +thread: `MetricKey` canonicalization, `DDCache` lookups for each label field, +`LRUCache` insertion. There is no queue between producer and consumer +— there *is* no consumer thread for the storage work, only for the +periodic report. So a 1,108 B/op allocation rate × 8 threads × ~1.5 M ops/s +generates ~13 GB/sec of garbage on the same thread that has to keep up with +incoming spans. The young gen fills, then survivor, then old gen, then full +GC. By iteration 3 the JVM is spending more than its wall-clock budget on +GC (multiple concurrent GC threads, summed > 15 s during a 15 s window) and +throughput collapses 73 %. + +### Why this design holds + +The producer/consumer split converts allocation pressure into **backpressure +at the inbox boundary**. The producer's per-op work is just "allocate one +`SpanSnapshot`, set a few `volatile` refs, `inbox.offer`, return." On +overflow, `offer` returns `false` and the snapshot is dropped on the spot — +no waiting, no allocation amplification. The aggregator thread runs at +its natural rate (~13 M snapshots/sec on the test machine), and the gap +between producer and consumer becomes the drop rate, not heap growth. +`onStatsInboxFull` makes that gap observable so operators can size +`tracerMetricsMaxPending` and `tracerMetricsMaxAggregates` for their +workload. + +Net: under adversarial input the new design absorbs what it can compute +meaningfully and drops what it can't, with both numbers exposed via health +metrics. The bounded-design properties hold to the ~32 MB worst-case +ceiling described above. + ## Why the redesign (history) The pipeline was previously `ConflatingMetricsAggregator` with: @@ -292,8 +367,20 @@ showed the producer dominating CPU time. The major shifts: ### Benchmark summary -`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single -trace, real `CoreTracer` with a no-op writer): +Four JMH benchmarks cover the producer pipeline at different angles: + +- `ClientStatsAggregatorBenchmark` — 64 SimpleSpans per op, identical labels + (consumer-side cache hit path). +- `ClientStatsAggregatorDDSpanBenchmark` — same as above but real `DDSpan` + via `CoreTracer`; exercises the production `isKind` / cached span-kind + ordinal path. +- `ClientStatsAggregatorMissPathBenchmark` — pool of 4096 single-span + traces with unique labels; exercises miss + insert + handler saturation. +- `AdversarialMetricsBenchmark` — 8 threads, unique labels per op, random + durations + error flags; pushes every bound to its limit (see + [Behavior under adversarial load](#behavior-under-adversarial-load)). + +Optimization progression on the DDSpan benchmark: | Variant | µs/op | |---|---| @@ -302,6 +389,11 @@ trace, real `CoreTracer` with a no-op writer): | with peer-tag schema hoist | 2.410 | | with cached span-kind ordinal + isSynthetic fix | 1.995 | +On the producer-bound miss-path benchmark (single-span trace, unique +labels), `ClientStatsAggregatorMissPathBenchmark` measures **0.057 µs/op + +96 B/op** vs master's **0.305 µs/op + 399 B/op** — 5.3× faster, 4.2× less +producer-thread allocation per metrics-eligible span. + The remaining producer-thread hotspots (from JFR sampling) are tag-map lookups for `peer.hostname` / other peer-tag values inside `capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that From b1358f85ae10758e0eef1f65d2af1d9fd5aa1982 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Tue, 19 May 2026 22:57:03 -0400 Subject: [PATCH 10/11] Add high-level Overview section to client-metrics design doc The doc previously jumped straight from the one-paragraph header into an implementation-shape diagram with class names and rule labels. Readers wanting to understand "what does this thing do and why" before reading the implementation got dumped into the deep end. Add an Overview section at the top covering: - The problem (aggregate spans rather than ship each one). - What a bucket is (label tuple + accumulator). - Goals (bounded memory, no producer contention, reset correctness, stable wire format). - A one-paragraph architecture description in plain language. - The three design rules stated conceptually (no implementation vocabulary). - The trade-offs the design makes deliberately. The existing "High-level shape" diagram and the implementation-detail sections that follow are unchanged -- the new Overview is additive, not a replacement. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/client_metrics_design.md | 77 +++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md index 3e3c79d39ff..0bb82d36bdd 100644 --- a/docs/client_metrics_design.md +++ b/docs/client_metrics_design.md @@ -8,6 +8,83 @@ does not have to sample every span to know request rates and latencies. Code lives in package `datadog.trace.common.metrics`. +## Overview + +Tracers emit thousands of spans per second. Reporting every one to the Datadog +Agent — let alone storing them — would be expensive and mostly redundant. The +question "what's my p95 latency for `GET /users/:id` on `web-frontend` over the +last 10s" doesn't need every individual span; it needs an aggregate. + +The client-stats pipeline computes those aggregates on the tracer itself and +ships rolled-up **buckets** to the agent on a fixed cadence. Each bucket is a +tuple of label values (resource, service, operation, span kind, peer tags, http +method/endpoint/status, grpc status, ...) plus a small accumulator: hit count, +error count, top-level count, duration sum, ok-latency histogram, error-latency +histogram. A bucket spans one reporting cycle (default 10 seconds); at the end +of the cycle the buckets are serialized to the agent's `/v0.6/stats` endpoint +and the in-memory accumulators are cleared. + +### Goals + +- **Bounded memory.** The aggregator's footprint must not grow without limit no + matter how many distinct label combinations the workload produces, or how + high the span throughput is. +- **No producer-thread contention.** Application threads that complete a span + shouldn't block on a lock or do meaningful work beyond cheap field + extraction. The tracer is a guest in the application's process; it must not + show up as overhead. +- **Correctness under reset.** Cardinality budgets and histograms are dropped + every reporting cycle. Mid-cycle drops and agent-downgrade clears can't + corrupt the aggregate table or fragment a single logical bucket. +- **Stable wire format.** The bucket payload matches the existing `/v0.6/stats` + schema. This is a re-implementation of an existing protocol, not a protocol + change. + +### Architecture in one paragraph + +A single **aggregator thread** owns every piece of mutable state — the bucket +table, the per-field cardinality budgets, the histograms. Application threads +build a small immutable **span snapshot** per metrics-eligible span and post it +to a bounded MPSC inbox. The aggregator drains the inbox, **canonicalizes** +each snapshot's label values through cardinality-capped UTF8 interners, hashes +the canonical form, and finds-or-inserts a bucket. A scheduled signal flushes +buckets to the agent every reporting interval; the cardinality budgets are +reset in lockstep with the flush. + +### Three rules + +1. **Producer threads never touch shared state.** They build a snapshot and + hand it off through the inbox. The aggregator does all the heavy work + (canonicalization, hashing, lookup, accumulator updates). +2. **Cardinality is capped per field, per cycle.** Each label field has its + own budget; overflow values collapse to a single `blocked_by_tracer` + sentinel so the bucket table can never grow past + `maxAggregates × (sum of per-field budgets)` distinct combinations. +3. **Aggregation happens on canonical UTF8 forms.** Two snapshots that disagree + only on representation (same content delivered once as a `String`, once as + a `UTF8BytesString`) collapse to the same bucket because hashing and + matching happen *after* canonicalization. + +### Trade-offs + +- **One snapshot allocation per metrics-eligible span.** ~100 bytes per + snapshot; cheap individually but a meaningful share of producer allocation + at high span throughput. Snapshots could be pooled or replaced with a + struct-of-arrays inbox; neither is currently worth the complexity. +- **Cap-overrun drops the new key, not LRU.** When the bucket table is at + capacity and no entry is stale enough to evict, an incoming snapshot for a + new label combination is dropped (and reported via + `onStatsAggregateDropped`). This protects the steady-state workload from a + burst of new keys that would otherwise displace established buckets. +- **One aggregator thread.** The whole consumer side is single-threaded by + design — locks, races, and visibility footguns are confined to the producer + → inbox handoff. If the producer rate is sustainedly higher than the + aggregator can drain, the inbox fills and snapshots are dropped + (`onStatsInboxFull`). +- **Fixed bucket table.** The hashtable's bucket array is sized once at + startup from `maxAggregates`. No dynamic resizing; entries beyond the cap + trigger the drop-new-key path above. + ## High-level shape ``` From 17a442ff7ea2d49d5ec32c0856c167e832d7ae6a Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Tue, 19 May 2026 23:36:38 -0400 Subject: [PATCH 11/11] Notify on peer-tag cardinality blocks Adds a per-cycle one-shot warn log + HealthMetrics counter (`stats.tag_cardinality_blocked` with `tag:`) when a peer-tag value gets collapsed to the `blocked_by_tracer` sentinel because its cardinality budget is exhausted. Implemented as a `register(int i, String value)` method on `PeerTagSchema` that does the post-block notification work; `TagCardinalityHandler` exposes `blockedSentinel()` so the schema can identity-compare and stays free of logger / health metric coupling. Warn-once gating uses a `Set` of names seen this cycle, cleared by `resetCardinalityHandlers()`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 11 +-- .../common/metrics/ClientStatsAggregator.java | 3 +- .../trace/common/metrics/PeerTagSchema.java | 76 ++++++++++++++----- .../common/metrics/TagCardinalityHandler.java | 9 +++ .../trace/core/monitor/HealthMetrics.java | 6 ++ .../core/monitor/TracerHealthMetrics.java | 5 ++ .../common/metrics/AggregateTableTest.java | 3 +- 7 files changed, 89 insertions(+), 24 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 91202db20a3..8f2ae1cc6b3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -466,10 +466,11 @@ void populate(SpanSnapshot s) { } /** - * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying {@code schema.handler(i)} - * to each value at the same index. Handler returns {@code EMPTY} for null inputs; we elide - * those from the buffer so the wire-format list-of-pairs only contains present peer tags. No - * allocation when the schema/values are absent or all values are null (buffer is just cleared). + * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying the schema's per-tag + * handler + warn-once notification at the same index. Returns {@code EMPTY} for null inputs; + * we elide those from the buffer so the wire-format list-of-pairs only contains present peer + * tags. No allocation when the schema/values are absent or all values are null (buffer is just + * cleared). */ private void populatePeerTags(PeerTagSchema schema, String[] values) { peerTagsBuffer.clear(); @@ -478,7 +479,7 @@ private void populatePeerTags(PeerTagSchema schema, String[] values) { } int n = schema.size(); for (int i = 0; i < n; i++) { - UTF8BytesString utf8 = schema.handler(i).register(values[i]); + UTF8BytesString utf8 = schema.register(i, values[i]); if (utf8 != UTF8BytesString.EMPTY) { peerTagsBuffer.add(utf8); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index eadef788bb0..1f212c0ed65 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -374,7 +374,8 @@ private synchronized PeerTagSchema refreshPeerAggSchema(long revision) { } Set names = features.peerTags(); PeerTagSchema schema = - PeerTagSchema.of(names == null ? Collections.emptySet() : names, revision); + PeerTagSchema.of( + names == null ? Collections.emptySet() : names, revision, healthMetrics); cachedPeerAggSchema = schema; return schema; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java index 0dc6e1c9e23..729d05f2be9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -2,41 +2,54 @@ import static datadog.trace.api.DDTags.BASE_SERVICE; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.HashSet; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Parallel arrays of peer-tag names and their {@link TagCardinalityHandler}s, indexed in lockstep. * *

Replaces the previous {@code Map} lookup with positional array * access: the producer captures span tag values into a {@code String[]} parallel to {@link #names}, - * and the consumer applies {@link #handler(int)} at the same index to canonicalize. + * and the consumer calls {@link #register(int, String)} at the same index to canonicalize the + * value through the per-tag cardinality handler. * *

Two schemas exist: * *

    *
  • {@link #INTERNAL} -- a singleton with one entry for {@code base.service}, used for * internal-kind spans where only the base service is aggregated. - *
  • A peer-aggregation schema built via {@link #of(Set, long)} for {@code client}/{@code - * producer}/{@code consumer} spans. {@link ClientStatsAggregator} caches the most recently - * built schema and compares its {@link #peerTagsRevision} against {@code + *
  • A peer-aggregation schema built via {@link #of(Set, long, HealthMetrics)} for {@code + * client}/{@code producer}/{@code consumer} spans. {@link ClientStatsAggregator} caches the + * most recently built schema and compares its {@link #peerTagsRevision} against {@code * DDAgentFeaturesDiscovery.peerTagsRevision()} to decide when to rebuild. *
* + *

Cardinality blocks emit a one-shot warn log per reporting cycle per tag (tracked via {@link + * #warnedCardinality}) and fire {@link HealthMetrics#onTagCardinalityBlocked(String)} for every + * blocked value -- statsd handles aggregation downstream. Both are reset by {@link + * #resetCardinalityHandlers()}. + * *

Each {@link SpanSnapshot} captures its own schema reference so producer and consumer agree on * the indexing even if the current schema is replaced between capture and consumption. * - *

Thread-safety: {@link TagCardinalityHandler}s are not thread-safe and must only be - * exercised on the aggregator thread. {@link #names} and {@link #peerTagsRevision} are final and - * safe to read from any thread. + *

Thread-safety: {@link TagCardinalityHandler}s and the warn-once set are not + * thread-safe and must only be exercised on the aggregator thread. {@link #names} and {@link + * #peerTagsRevision} are final and safe to read from any thread. */ final class PeerTagSchema { + private static final Logger log = LoggerFactory.getLogger(PeerTagSchema.class); + /** Sentinel revision for {@link #INTERNAL} -- it never changes. */ static final long INTERNAL_REVISION = -1L; /** Singleton schema for internal-kind spans -- only {@code base.service}. */ static final PeerTagSchema INTERNAL = - new PeerTagSchema(new String[] {BASE_SERVICE}, INTERNAL_REVISION); + new PeerTagSchema(new String[] {BASE_SERVICE}, INTERNAL_REVISION, HealthMetrics.NO_OP); final String[] names; final TagCardinalityHandler[] handlers; @@ -48,14 +61,24 @@ final class PeerTagSchema { */ final long peerTagsRevision; + private final HealthMetrics healthMetrics; + + /** + * Per-cycle warn-once gating. {@code Set.add(name)} returns true exactly the first time a tag + * gets blocked this cycle, which is the only time we want to emit the warn log. Cleared by + * {@link #resetCardinalityHandlers()}. + */ + private final Set warnedCardinality = new HashSet<>(); + /** Builds a schema for the given peer-tag names. Order is determined by the {@link Set}. */ - static PeerTagSchema of(Set names, long peerTagsRevision) { - return new PeerTagSchema(names.toArray(new String[0]), peerTagsRevision); + static PeerTagSchema of(Set names, long peerTagsRevision, HealthMetrics healthMetrics) { + return new PeerTagSchema(names.toArray(new String[0]), peerTagsRevision, healthMetrics); } - private PeerTagSchema(String[] names, long peerTagsRevision) { + private PeerTagSchema(String[] names, long peerTagsRevision, HealthMetrics healthMetrics) { this.names = names; this.peerTagsRevision = peerTagsRevision; + this.healthMetrics = healthMetrics; this.handlers = new TagCardinalityHandler[names.length]; for (int i = 0; i < names.length; i++) { this.handlers[i] = @@ -64,13 +87,36 @@ private PeerTagSchema(String[] names, long peerTagsRevision) { } /** - * Resets every {@link TagCardinalityHandler}'s working set. Must be called on the aggregator - * thread; handlers are not thread-safe. + * Canonicalizes the peer-tag value at slot {@code i}. Returns {@link UTF8BytesString#EMPTY} for + * null inputs and the handler's {@code ":blocked_by_tracer"} sentinel when the per-tag + * cardinality budget is exhausted. Fires {@link HealthMetrics#onTagCardinalityBlocked(String)} + * on every block; emits a one-shot warn log per cycle per tag. + */ + UTF8BytesString register(int i, String value) { + TagCardinalityHandler handler = handlers[i]; + UTF8BytesString result = handler.register(value); + if (handler.isBlockedResult(result)) { + String name = names[i]; + healthMetrics.onTagCardinalityBlocked(name); + if (warnedCardinality.add(name)) { + log.warn( + "Cardinality limit reached for peer tag '{}'; further values are reported as" + + " 'blocked_by_tracer' until the next reporting cycle", + name); + } + } + return result; + } + + /** + * Resets every {@link TagCardinalityHandler}'s working set and clears the per-cycle warn-once + * tracking. Must be called on the aggregator thread; handlers are not thread-safe. */ void resetCardinalityHandlers() { for (TagCardinalityHandler h : handlers) { h.reset(); } + warnedCardinality.clear(); } int size() { @@ -80,8 +126,4 @@ int size() { String name(int i) { return names[i]; } - - TagCardinalityHandler handler(int i) { - return handlers[i]; - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java index c8a0b8779e3..d96f16f4024 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java @@ -78,6 +78,15 @@ private int probe(String[] keys, String value) { return idx; } + /** + * Whether {@code result} (returned from a prior {@link #register} call) is this handler's + * blocked sentinel. The size check short-circuits the hot path so the sentinel is never + * materialized before any value has actually been blocked this cycle. + */ + boolean isBlockedResult(UTF8BytesString result) { + return this.curSize >= this.cardinalityLimit && result == blockedByTracer(); + } + private UTF8BytesString blockedByTracer() { UTF8BytesString cacheBlocked = this.cacheBlocked; if (cacheBlocked != null) return cacheBlocked; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index d1c7fe126b4..bed5bbb341c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -98,6 +98,12 @@ public void onStatsAggregateDropped() {} */ public void onStatsInboxFull() {} + /** + * Reports a single tag value collapsed into the {@code blocked_by_tracer} sentinel because the + * per-tag cardinality budget for the current reporting cycle was exhausted. + */ + public void onTagCardinalityBlocked(String tag) {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index db384a7e42e..863cd43c7cc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -363,6 +363,11 @@ public void onStatsInboxFull() { statsInboxFull.increment(); } + @Override + public void onTagCardinalityBlocked(String tag) { + statsd.incrementCounter("stats.tag_cardinality_blocked", new String[] {"tag:" + tag}); + } + @Override public void close() { if (null != cancellation) { diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 57ac6ddef8b..c90594b1895 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -238,7 +238,8 @@ SnapshotBuilder peerTags(String... namesAndValues) { for (int i = 0; i < namesAndValues.length; i += 2) { names.add(namesAndValues[i]); } - this.peerTagSchema = PeerTagSchema.of(names, 0L); + this.peerTagSchema = + PeerTagSchema.of(names, 0L, datadog.trace.core.monitor.HealthMetrics.NO_OP); this.peerTagValues = new String[peerTagSchema.size()]; for (int i = 0; i < namesAndValues.length; i += 2) { for (int j = 0; j < peerTagSchema.size(); j++) {