Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<claude-mem-context>
# Memory Context

# [otelcontext] recent context, 2026-04-28 1:14am UTC
# [otelcontext] recent context, 2026-04-28 6:43am UTC

No previous sessions found.
</claude-mem-context>
6 changes: 6 additions & 0 deletions internal/ingest/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ func (s *MetricsServer) SetMetricCallback(cb func(tsdb.RawMetric)) {

// Export handles incoming OTLP metrics data.
func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error) {
start := time.Now()
defer func() { s.metrics.ObserveIngestDuration("metrics", time.Since(start)) }()
for _, resourceMetrics := range req.ResourceMetrics {
serviceName := getServiceName(resourceMetrics.Resource.Attributes)

Expand Down Expand Up @@ -283,6 +285,8 @@ func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetr

// Export handles incoming OTLP trace data.
func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
start := time.Now()
defer func() { s.metrics.ObserveIngestDuration("traces", time.Since(start)) }()
slog.Debug("📥 [TRACES] Received Request", "resource_spans", len(req.ResourceSpans))

type batchResult struct {
Expand Down Expand Up @@ -543,6 +547,8 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer

// Export handles incoming OTLP log data.
func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
start := time.Now()
defer func() { s.metrics.ObserveIngestDuration("logs", time.Since(start)) }()
// slog.Debug("📥 [LOGS] Received Request", "resource_logs", len(req.ResourceLogs))

logResults := make([][]storage.Log, len(req.ResourceLogs))
Expand Down
23 changes: 23 additions & 0 deletions internal/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type Metrics struct {
DBLatency prometheus.Histogram
DLQSize prometheus.Gauge

// IngestDurationSeconds is the per-Export E2E latency observed inside
// the OTLP servers (gRPC + HTTP), labeled by signal {traces,logs,metrics}.
// Drives ingest SLOs: alert on p99 / error budget burn rather than on the
// blunt OtelContext_grpc_request_duration_seconds aggregate.
IngestDurationSeconds *prometheus.HistogramVec

// --- gRPC ---
GRPCRequestsTotal *prometheus.CounterVec
GRPCRequestDuration *prometheus.HistogramVec
Expand Down Expand Up @@ -153,6 +159,12 @@ func New() *Metrics {
Help: "Number of files currently in the Dead Letter Queue.",
}),

IngestDurationSeconds: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "otelcontext_ingest_duration_seconds",
Help: "End-to-end OTLP Export latency observed in the ingest server, by signal.",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, []string{"signal"}),

// gRPC
GRPCRequestsTotal: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "OtelContext_grpc_requests_total",
Expand Down Expand Up @@ -401,6 +413,17 @@ func (m *Metrics) RecordIngestion(count int) {
m.totalIngested.Add(int64(count))
}

// ObserveIngestDuration records an end-to-end OTLP Export latency for the
// given signal. Callers should pass time.Since(start) measured from the very
// start of the Export handler. Nil-safe so the OTLP servers can be wired
// without a Metrics instance during tests.
func (m *Metrics) ObserveIngestDuration(signal string, d time.Duration) {
if m == nil || m.IngestDurationSeconds == nil {
return
}
m.IngestDurationSeconds.WithLabelValues(signal).Observe(d.Seconds())
}

func (m *Metrics) SetActiveConnections(n int) {
m.ActiveConnections.Set(float64(n))
m.activeConns.Store(int64(n))
Expand Down
36 changes: 36 additions & 0 deletions internal/telemetry/metrics_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package telemetry
import (
"database/sql"
"testing"
"time"

_ "github.com/glebarez/go-sqlite" // registers "sqlite" driver used by glebarez/sqlite GORM dialect
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -65,4 +66,39 @@ func TestSampleDBPoolStats(t *testing.T) {
var m2 *Metrics
m2.SampleDBPoolStats(nil)
})

t.Run("ObserveIngestDuration_RecordsByLabel", func(t *testing.T) {
// Observe a duration for each signal and assert the histogram count
// increases for the matching label only — verifies the per-signal
// label split is wired correctly.
for _, signal := range []string{"traces", "logs", "metrics"} {
before := histCountForTest(t, m.IngestDurationSeconds, signal)
m.ObserveIngestDuration(signal, 25*time.Millisecond)
after := histCountForTest(t, m.IngestDurationSeconds, signal)
if after != before+1 {
t.Fatalf("signal=%s: count did not advance: before=%d after=%d", signal, before, after)
}
}
})

t.Run("ObserveIngestDuration_NilSafe", func(t *testing.T) {
// nil receiver must not panic — protects ingest tests that pass nil
// telemetry.Metrics through to the OTLP servers.
var m2 *Metrics
m2.ObserveIngestDuration("traces", time.Millisecond)
})
}

// histCountForTest scrapes the cumulative count of a labeled histogram.
func histCountForTest(t *testing.T, h *prometheus.HistogramVec, label string) uint64 {
t.Helper()
hist, err := h.GetMetricWithLabelValues(label)
if err != nil {
t.Fatalf("GetMetricWithLabelValues(%q): %v", label, err)
}
var dm dto.Metric
if err := hist.(prometheus.Metric).Write(&dm); err != nil {
t.Fatalf("histogram write: %v", err)
}
return dm.GetHistogram().GetSampleCount()
}
Loading