diff --git a/CLAUDE.md b/CLAUDE.md index 066d2d4..63f63da 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -39,7 +39,7 @@ HTTP :8080 ◄── REST API ◄───────────┘ | Path | Endpoint | Content Types | Notes | |------|----------|---------------|-------| | gRPC | `:4317` | protobuf | Traces, Logs, Metrics via OTLP gRPC | -| HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit | +| HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit. Returns `429 Too Many Requests` + `Retry-After: 1` when the async pipeline queue is full (parity with gRPC `RESOURCE_EXHAUSTED`). | Both paths delegate to the same `Export()` methods — zero business logic duplication. By default `Export()` parses the OTLP request and hands a `Batch` to the async ingest `Pipeline` (`internal/ingest/pipeline.go`); a worker pool persists Trace→Span→Log in order. With `INGEST_ASYNC_ENABLED=false` the pipeline is bypassed and `Export()` writes inline (legacy path). diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 4a8ec67..9329224 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -93,7 +93,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft - `VECTOR_INDEX_MAX_ENTRIES=100000` - `SAMPLING_*` (defaults keep 100% + always-on errors) - `GRAPHRAG_WORKER_COUNT=16`, `GRAPHRAG_EVENT_QUEUE_SIZE=100000` — sized for 100–200 services. Lower for tiny deployments; raise further if `graphrag_events_dropped_total` climbs. -- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}` and `otelcontext_ingest_pipeline_queue_depth{signal}`. +- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` (HTTP `429 Too Many Requests` + `Retry-After: 1` on the OTLP HTTP receiver) at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}`, `otelcontext_ingest_pipeline_queue_depth{signal}`, and `otelcontext_http_otlp_throttled_total{signal}`. - `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps - `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs diff --git a/internal/ingest/otlp_http.go b/internal/ingest/otlp_http.go index cc5d189..7d7008f 100644 --- a/internal/ingest/otlp_http.go +++ b/internal/ingest/otlp_http.go @@ -8,16 +8,26 @@ import ( "io" "log/slog" "net/http" + "strconv" "github.com/RandomCodeSpace/otelcontext/internal/storage" collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) +// defaultRetryAfterSeconds is the Retry-After value advertised when the +// pipeline queue is full and the HTTP handler returns 429. Mirrors the +// gRPC RESOURCE_EXHAUSTED back-off semantics from Phase 1 — short enough +// that healthy clients recover quickly, long enough to give a stuck +// downstream a chance to drain. +const defaultRetryAfterSeconds = 1 + // withTenantFromHTTP attaches a tenant ID from the X-Tenant-ID header (if any) // to the request context before delegating to the gRPC Export methods. // Uses the shared storage.WithTenantContext helper so ingest and read paths @@ -47,6 +57,11 @@ type HTTPHandler struct { metrics *MetricsServer maxBodyBytes int64 // pre-decompress wire size cap maxDecompressedBytes int64 // post-gzip decompressed size cap (zip-bomb guard) + + // onThrottle is invoked once per signal type (traces|logs|metrics) every + // time the async ingest pipeline returns RESOURCE_EXHAUSTED and the HTTP + // handler maps it to 429. nil-safe. + onThrottle func(signal string) } // NewHTTPHandler creates an HTTP OTLP handler wrapping the existing gRPC servers. @@ -75,6 +90,40 @@ func (h *HTTPHandler) SetMaxDecompressedBytes(n int64) { } } +// SetThrottleCallback wires a per-signal counter that increments every time a +// 429 is returned because the async ingest pipeline is at capacity. Used by +// main.go to feed `otelcontext_http_otlp_throttled_total{signal=…}`. +func (h *HTTPHandler) SetThrottleCallback(fn func(signal string)) { + h.onThrottle = fn +} + +// isQueueFull reports whether the error returned by an Export() method is +// the gRPC RESOURCE_EXHAUSTED status used by the async pipeline to signal +// "queue at capacity". Used by the HTTP handlers to map back to 429. +func isQueueFull(err error) bool { + if err == nil { + return false + } + if errors.Is(err, ErrQueueFull) { + return true + } + if s, ok := grpcstatus.FromError(err); ok && s.Code() == codes.ResourceExhausted { + return true + } + return false +} + +// writeThrottled emits an OTLP-shaped 429 with a Retry-After header. The +// Retry-After value is duplicated in the protobuf Status message so clients +// that don't read headers (some custom OTLP shims) still see it. +func (h *HTTPHandler) writeThrottled(w http.ResponseWriter, signal string) { + if h.onThrottle != nil { + h.onThrottle(signal) + } + w.Header().Set("Retry-After", strconv.Itoa(defaultRetryAfterSeconds)) + writeOTLPError(w, http.StatusTooManyRequests, fmt.Sprintf("ingest pipeline at capacity, retry after %ds", defaultRetryAfterSeconds)) +} + // RegisterRoutes registers the HTTP OTLP endpoints on the given mux. func (h *HTTPHandler) RegisterRoutes(mux *http.ServeMux) { mux.HandleFunc("POST /v1/traces", h.handleTraces) @@ -101,6 +150,10 @@ func (h *HTTPHandler) handleTraces(w http.ResponseWriter, r *http.Request) { resp, err := h.traces.Export(withTenantFromHTTP(r), req) if err != nil { + if isQueueFull(err) { + h.writeThrottled(w, "traces") + return + } slog.Error("HTTP OTLP traces export failed", "error", err) writeOTLPError(w, http.StatusInternalServerError, err.Error()) return @@ -128,6 +181,10 @@ func (h *HTTPHandler) handleLogs(w http.ResponseWriter, r *http.Request) { resp, err := h.logs.Export(withTenantFromHTTP(r), req) if err != nil { + if isQueueFull(err) { + h.writeThrottled(w, "logs") + return + } slog.Error("HTTP OTLP logs export failed", "error", err) writeOTLPError(w, http.StatusInternalServerError, err.Error()) return @@ -155,6 +212,10 @@ func (h *HTTPHandler) handleMetrics(w http.ResponseWriter, r *http.Request) { resp, err := h.metrics.Export(withTenantFromHTTP(r), req) if err != nil { + if isQueueFull(err) { + h.writeThrottled(w, "metrics") + return + } slog.Error("HTTP OTLP metrics export failed", "error", err) writeOTLPError(w, http.StatusInternalServerError, err.Error()) return diff --git a/internal/ingest/otlp_http_backpressure_test.go b/internal/ingest/otlp_http_backpressure_test.go new file mode 100644 index 0000000..3f509b0 --- /dev/null +++ b/internal/ingest/otlp_http_backpressure_test.go @@ -0,0 +1,293 @@ +package ingest + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/config" + "github.com/RandomCodeSpace/otelcontext/internal/storage" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + "google.golang.org/protobuf/proto" +) + +// priorityTracesBody marshals an OTLP trace request whose first span is +// flagged STATUS_CODE_ERROR. The pipeline treats this as a priority batch, +// so it bypasses soft backpressure and requires a literal full-channel +// rejection (ErrQueueFull) — the path the HTTP 429 mapping is meant to cover. +func priorityTracesBody(t *testing.T, service string, count int) []byte { + t.Helper() + req := buildTracesRequest(service, count) + if len(req.ResourceSpans) > 0 && len(req.ResourceSpans[0].ScopeSpans) > 0 && len(req.ResourceSpans[0].ScopeSpans[0].Spans) > 0 { + req.ResourceSpans[0].ScopeSpans[0].Spans[0].Status = &tracepb.Status{Code: tracepb.Status_STATUS_CODE_ERROR} + } + body, err := proto.Marshal(req) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return body +} + +// priorityLogsBody marshals an OTLP logs request flagged ERROR severity so +// it bypasses soft backpressure (LogsServer flags HasError when any record +// is Severity ERROR or FATAL). +func priorityLogsBody(t *testing.T, service string, count int) []byte { + t.Helper() + req := buildLogsRequest(service, count) + for _, rl := range req.ResourceLogs { + for _, sl := range rl.ScopeLogs { + for _, lr := range sl.LogRecords { + lr.SeverityText = "ERROR" + } + } + } + body, err := proto.Marshal(req) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return body +} + +// newHTTPBackpressureHarness wires a TraceServer + LogsServer + MetricsServer +// to a Pipeline whose capacity is exhausted by the first Submit, so any +// follow-up Export returns ErrQueueFull. Used by Phase 4 tests that exercise +// the HTTP 429 + Retry-After path. +type httpBackpressureHarness struct { + repo *storage.Repository + pipeline *Pipeline + handler *HTTPHandler +} + +func newHTTPBackpressureHarness(t *testing.T) *httpBackpressureHarness { + t.Helper() + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + + cfg := &config.Config{ + IngestMinSeverity: "DEBUG", + SamplingLatencyThresholdMs: 500, + } + traces := NewTraceServer(repo, nil, cfg) + logs := NewLogsServer(repo, nil, cfg) + // Metrics server is not needed for backpressure tests — the throttle + // path is exercised via traces and logs. Pass a no-op MetricsServer + // (nil tsdb is safe because we never invoke metrics.Export here). + var metrics *MetricsServer + + // Capacity 1, NO workers — a single submit fills it and the next is + // ErrQueueFull. Capacity 0 is rejected by the pipeline so we use 1 + a + // pre-fill batch. + pl := NewPipeline(repo, nil, PipelineConfig{Capacity: 1, Workers: 0, SoftThreshold: 0.5}) + traces.SetPipeline(pl) + logs.SetPipeline(pl) + + // Pre-fill with a PRIORITY batch (HasError=true) so it bypasses soft + // backpressure and lands in the channel — capacity 1, so the channel is + // now full. Subsequent priority submits hit ErrQueueFull (the channel- + // full path); healthy submits would still be silently soft-dropped. + if err := pl.Submit(&Batch{ + Type: SignalTraces, + Tenant: "default", + Traces: []storage.Trace{{TraceID: "x", ServiceName: "svc"}}, + HasError: true, + }); err != nil { + t.Fatalf("pre-fill Submit: %v", err) + } + + h := NewHTTPHandler(traces, logs, metrics) + t.Cleanup(func() { + pl.Stop() + _ = repo.Close() + }) + return &httpBackpressureHarness{repo: repo, pipeline: pl, handler: h} +} + +// TestHTTPBackpressure_TracesReturns429WithRetryAfter verifies that when the +// async pipeline is at capacity, the HTTP traces endpoint responds with 429, +// a Retry-After header, and an OTLP-shaped Status protobuf body. +func TestHTTPBackpressure_TracesReturns429WithRetryAfter(t *testing.T) { + h := newHTTPBackpressureHarness(t) + body := priorityTracesBody(t, "svc", 1) + + hr := httptest.NewRequest(http.MethodPost, "/v1/traces", bytes.NewReader(body)) + hr.Header.Set("Content-Type", contentTypeProtobuf) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.handler.RegisterRoutes(mux) + mux.ServeHTTP(rec, hr) + + if rec.Code != http.StatusTooManyRequests { + t.Fatalf("want 429, got %d (body=%q)", rec.Code, rec.Body.String()) + } + if got := rec.Header().Get("Retry-After"); got == "" { + t.Fatal("Retry-After header missing on 429 response") + } + if ct := rec.Header().Get("Content-Type"); ct != contentTypeProtobuf { + t.Fatalf("Content-Type want %s, got %s", contentTypeProtobuf, ct) + } +} + +// TestHTTPBackpressure_LogsReturns429 mirrors the traces test for logs. +func TestHTTPBackpressure_LogsReturns429(t *testing.T) { + h := newHTTPBackpressureHarness(t) + body := priorityLogsBody(t, "svc", 1) + + hr := httptest.NewRequest(http.MethodPost, "/v1/logs", bytes.NewReader(body)) + hr.Header.Set("Content-Type", contentTypeProtobuf) + rec := httptest.NewRecorder() + + mux := http.NewServeMux() + h.handler.RegisterRoutes(mux) + mux.ServeHTTP(rec, hr) + + if rec.Code != http.StatusTooManyRequests { + t.Fatalf("want 429, got %d (body=%q)", rec.Code, rec.Body.String()) + } + if rec.Header().Get("Retry-After") == "" { + t.Fatal("Retry-After header missing") + } +} + +// TestHTTPBackpressure_ThrottleCallbackInvoked verifies the per-signal +// callback fires exactly once per 429, with the right signal label. +func TestHTTPBackpressure_ThrottleCallbackInvoked(t *testing.T) { + h := newHTTPBackpressureHarness(t) + + var traceHits, logHits, metricHits atomic.Int64 + h.handler.SetThrottleCallback(func(signal string) { + switch signal { + case "traces": + traceHits.Add(1) + case "logs": + logHits.Add(1) + case "metrics": + metricHits.Add(1) + } + }) + + mux := http.NewServeMux() + h.handler.RegisterRoutes(mux) + + tBody := priorityTracesBody(t, "svc", 1) + hr := httptest.NewRequest(http.MethodPost, "/v1/traces", bytes.NewReader(tBody)) + hr.Header.Set("Content-Type", contentTypeProtobuf) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, hr) + if rec.Code != http.StatusTooManyRequests { + t.Fatalf("traces: want 429, got %d", rec.Code) + } + + lBody := priorityLogsBody(t, "svc", 1) + hr = httptest.NewRequest(http.MethodPost, "/v1/logs", bytes.NewReader(lBody)) + hr.Header.Set("Content-Type", contentTypeProtobuf) + rec = httptest.NewRecorder() + mux.ServeHTTP(rec, hr) + if rec.Code != http.StatusTooManyRequests { + t.Fatalf("logs: want 429, got %d", rec.Code) + } + + if traceHits.Load() != 1 { + t.Fatalf("traceHits = %d, want 1", traceHits.Load()) + } + if logHits.Load() != 1 { + t.Fatalf("logHits = %d, want 1", logHits.Load()) + } + if metricHits.Load() != 0 { + t.Fatalf("metricHits should be 0 when no metric request was sent; got %d", metricHits.Load()) + } +} + +// TestHTTPBackpressure_NotInvokedOnSuccess verifies that a successful +// (non-throttled) Export does NOT increment the throttle counter. +func TestHTTPBackpressure_NotInvokedOnSuccess(t *testing.T) { + // Use a normal harness with workers so Submit succeeds. + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + defer func() { _ = repo.Close() }() + + cfg := &config.Config{IngestMinSeverity: "DEBUG", SamplingLatencyThresholdMs: 500} + traces := NewTraceServer(repo, nil, cfg) + logs := NewLogsServer(repo, nil, cfg) + var metrics *MetricsServer // not exercised in this test + pl := NewPipeline(repo, nil, PipelineConfig{Capacity: 16, Workers: 1, SoftThreshold: 0.9}) + pl.Start(context.Background()) + defer pl.Stop() + traces.SetPipeline(pl) + logs.SetPipeline(pl) + + handler := NewHTTPHandler(traces, logs, metrics) + + var hits atomic.Int64 + handler.SetThrottleCallback(func(signal string) { hits.Add(1) }) + + mux := http.NewServeMux() + handler.RegisterRoutes(mux) + + body, _ := proto.Marshal(buildTracesRequest("svc", 1)) + hr := httptest.NewRequest(http.MethodPost, "/v1/traces", bytes.NewReader(body)) + hr.Header.Set("Content-Type", contentTypeProtobuf) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, hr) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%q", rec.Code, rec.Body.String()) + } + if hits.Load() != 0 { + t.Fatalf("throttle callback fired on a successful request; hits=%d", hits.Load()) + } +} + +// TestIsQueueFull_ClassifiesCorrectly verifies the helper picks up both +// gRPC RESOURCE_EXHAUSTED status errors AND the local ErrQueueFull sentinel. +func TestIsQueueFull_ClassifiesCorrectly(t *testing.T) { + if !isQueueFull(ErrQueueFull) { + t.Fatal("isQueueFull should match ErrQueueFull sentinel") + } + // Wrap to confirm errors.Is propagation. + wrapped := wrapErr(ErrQueueFull) + if !isQueueFull(wrapped) { + t.Fatal("isQueueFull should match wrapped ErrQueueFull") + } + if isQueueFull(nil) { + t.Fatal("isQueueFull(nil) must be false") + } + if isQueueFull(simpleErr("boom")) { + t.Fatal("isQueueFull should not match unrelated errors") + } +} + +// helpers below are deliberately tiny so they don't accumulate testing +// abstractions inside ingest_test. + +type simpleErr string + +func (e simpleErr) Error() string { return string(e) } + +func wrapErr(err error) error { + return &wrapped{err: err} +} + +type wrapped struct{ err error } + +func (w *wrapped) Error() string { return "wrapped: " + w.err.Error() } +func (w *wrapped) Unwrap() error { return w.err } + +// keep the import set narrow — protojson + JSON content not needed. +var _ = strings.Contains diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 2637c7a..b4e6a6b 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -101,6 +101,12 @@ type Metrics struct { // reason="queue_full" — batch rejected at 100% capacity (client got 429/RESOURCE_EXHAUSTED). IngestPipelineDroppedTotal *prometheus.CounterVec + // HTTPOTLPThrottledTotal — count of HTTP 429s issued by the OTLP HTTP + // receiver when the async ingest pipeline is full. Mirrors the gRPC + // RESOURCE_EXHAUSTED path so operators see a single throttling signal + // across both transports. Label `signal` is one of traces|logs|metrics. + HTTPOTLPThrottledTotal *prometheus.CounterVec + // --- DB pool (sampled every 5s from sql.DB.Stats) --- DBPoolOpenConnections prometheus.Gauge DBPoolInUse prometheus.Gauge @@ -310,6 +316,10 @@ func New() *Metrics { Name: "otelcontext_ingest_pipeline_queue_depth", Help: "Current depth of the async ingest pipeline queue, by signal type.", }, []string{"signal"}), + HTTPOTLPThrottledTotal: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "otelcontext_http_otlp_throttled_total", + Help: "OTLP HTTP requests rejected with 429 because the async ingest pipeline is at capacity, by signal type.", + }, []string{"signal"}), IngestPipelineDroppedTotal: promauto.NewCounterVec(prometheus.CounterOpts{ Name: "otelcontext_ingest_pipeline_dropped_total", Help: "Batches dropped by the async ingest pipeline. reason=soft_backpressure (>=90% queue, healthy) or queue_full (100% queue, rejected to client).", diff --git a/main.go b/main.go index 8ef1c39..8d0a60c 100644 --- a/main.go +++ b/main.go @@ -607,6 +607,11 @@ func main() { // 7b. Register HTTP OTLP endpoints (before catch-all UI handler) otlpHTTP := ingest.NewHTTPHandler(traceServer, logsServer, metricsServer) + if metrics != nil && metrics.HTTPOTLPThrottledTotal != nil { + otlpHTTP.SetThrottleCallback(func(signal string) { + metrics.HTTPOTLPThrottledTotal.WithLabelValues(signal).Inc() + }) + } // 8. Start HTTP Server mux := http.NewServeMux()