From e8dc0ffcb6d05eb3e51730469c3871ef8ae38b42 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 07:25:17 +0000 Subject: [PATCH 1/2] checkpoint: pre-yolo 2026-04-28T07:25:17 --- AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index b7a48b1..efa389f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,7 +1,7 @@ # Memory Context -# [otelcontext] recent context, 2026-04-28 1:14am UTC +# [otelcontext] recent context, 2026-04-28 6:43am UTC No previous sessions found. \ No newline at end of file From edfa7d87924548e53fce2f6de0177386bde54726 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 07:30:28 +0000 Subject: [PATCH 2/2] feat(telemetry): add per-signal ingest E2E latency histogram MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the production-readiness gap where there was no way to set or alert on a p99 ingest SLO — IngestionRate was a counter only, and GRPCRequestDuration covered the gRPC layer broadly without a per-signal split. Adds otelcontext_ingest_duration_seconds{signal} histogram observed via defer time.Since(start) in TraceServer/LogsServer/MetricsServer.Export. The HTTP OTLP handler delegates to the same Export methods so both transports record uniformly with a single instrumentation site. - New label "signal" ∈ {traces, logs, metrics} for per-signal SLO alerting - Buckets cover 1ms..10s (typical OTLP ingest range) - Nil-safe ObserveIngestDuration helper protects ingest tests that pass nil telemetry.Metrics - Subtests in metrics_pool_test.go assert per-label count advances and nil-receiver safety Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/ingest/otlp.go | 6 +++++ internal/telemetry/metrics.go | 23 ++++++++++++++++ internal/telemetry/metrics_pool_test.go | 36 +++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/internal/ingest/otlp.go b/internal/ingest/otlp.go index 6048c22..f06e99b 100644 --- a/internal/ingest/otlp.go +++ b/internal/ingest/otlp.go @@ -213,6 +213,8 @@ func (s *MetricsServer) SetMetricCallback(cb func(tsdb.RawMetric)) { // Export handles incoming OTLP metrics data. func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error) { + start := time.Now() + defer func() { s.metrics.ObserveIngestDuration("metrics", time.Since(start)) }() for _, resourceMetrics := range req.ResourceMetrics { serviceName := getServiceName(resourceMetrics.Resource.Attributes) @@ -283,6 +285,8 @@ func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetr // Export handles incoming OTLP trace data. func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { + start := time.Now() + defer func() { s.metrics.ObserveIngestDuration("traces", time.Since(start)) }() slog.Debug("📥 [TRACES] Received Request", "resource_spans", len(req.ResourceSpans)) type batchResult struct { @@ -543,6 +547,8 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer // Export handles incoming OTLP log data. func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) { + start := time.Now() + defer func() { s.metrics.ObserveIngestDuration("logs", time.Since(start)) }() // slog.Debug("📥 [LOGS] Received Request", "resource_logs", len(req.ResourceLogs)) logResults := make([][]storage.Log, len(req.ResourceLogs)) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index b4e6a6b..44f0afc 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -28,6 +28,12 @@ type Metrics struct { DBLatency prometheus.Histogram DLQSize prometheus.Gauge + // IngestDurationSeconds is the per-Export E2E latency observed inside + // the OTLP servers (gRPC + HTTP), labeled by signal {traces,logs,metrics}. + // Drives ingest SLOs: alert on p99 / error budget burn rather than on the + // blunt OtelContext_grpc_request_duration_seconds aggregate. + IngestDurationSeconds *prometheus.HistogramVec + // --- gRPC --- GRPCRequestsTotal *prometheus.CounterVec GRPCRequestDuration *prometheus.HistogramVec @@ -153,6 +159,12 @@ func New() *Metrics { Help: "Number of files currently in the Dead Letter Queue.", }), + IngestDurationSeconds: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "otelcontext_ingest_duration_seconds", + Help: "End-to-end OTLP Export latency observed in the ingest server, by signal.", + Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + }, []string{"signal"}), + // gRPC GRPCRequestsTotal: promauto.NewCounterVec(prometheus.CounterOpts{ Name: "OtelContext_grpc_requests_total", @@ -401,6 +413,17 @@ func (m *Metrics) RecordIngestion(count int) { m.totalIngested.Add(int64(count)) } +// ObserveIngestDuration records an end-to-end OTLP Export latency for the +// given signal. Callers should pass time.Since(start) measured from the very +// start of the Export handler. Nil-safe so the OTLP servers can be wired +// without a Metrics instance during tests. +func (m *Metrics) ObserveIngestDuration(signal string, d time.Duration) { + if m == nil || m.IngestDurationSeconds == nil { + return + } + m.IngestDurationSeconds.WithLabelValues(signal).Observe(d.Seconds()) +} + func (m *Metrics) SetActiveConnections(n int) { m.ActiveConnections.Set(float64(n)) m.activeConns.Store(int64(n)) diff --git a/internal/telemetry/metrics_pool_test.go b/internal/telemetry/metrics_pool_test.go index 47079ae..d36939f 100644 --- a/internal/telemetry/metrics_pool_test.go +++ b/internal/telemetry/metrics_pool_test.go @@ -3,6 +3,7 @@ package telemetry import ( "database/sql" "testing" + "time" _ "github.com/glebarez/go-sqlite" // registers "sqlite" driver used by glebarez/sqlite GORM dialect "github.com/prometheus/client_golang/prometheus" @@ -65,4 +66,39 @@ func TestSampleDBPoolStats(t *testing.T) { var m2 *Metrics m2.SampleDBPoolStats(nil) }) + + t.Run("ObserveIngestDuration_RecordsByLabel", func(t *testing.T) { + // Observe a duration for each signal and assert the histogram count + // increases for the matching label only — verifies the per-signal + // label split is wired correctly. + for _, signal := range []string{"traces", "logs", "metrics"} { + before := histCountForTest(t, m.IngestDurationSeconds, signal) + m.ObserveIngestDuration(signal, 25*time.Millisecond) + after := histCountForTest(t, m.IngestDurationSeconds, signal) + if after != before+1 { + t.Fatalf("signal=%s: count did not advance: before=%d after=%d", signal, before, after) + } + } + }) + + t.Run("ObserveIngestDuration_NilSafe", func(t *testing.T) { + // nil receiver must not panic — protects ingest tests that pass nil + // telemetry.Metrics through to the OTLP servers. + var m2 *Metrics + m2.ObserveIngestDuration("traces", time.Millisecond) + }) +} + +// histCountForTest scrapes the cumulative count of a labeled histogram. +func histCountForTest(t *testing.T, h *prometheus.HistogramVec, label string) uint64 { + t.Helper() + hist, err := h.GetMetricWithLabelValues(label) + if err != nil { + t.Fatalf("GetMetricWithLabelValues(%q): %v", label, err) + } + var dm dto.Metric + if err := hist.(prometheus.Metric).Write(&dm); err != nil { + t.Fatalf("histogram write: %v", err) + } + return dm.GetHistogram().GetSampleCount() }