Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ HTTP :8080 ◄── REST API ◄───────────┘
| Path | Endpoint | Content Types | Notes |
|------|----------|---------------|-------|
| gRPC | `:4317` | protobuf | Traces, Logs, Metrics via OTLP gRPC |
| HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit |
| HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit. Returns `429 Too Many Requests` + `Retry-After: 1` when the async pipeline queue is full (parity with gRPC `RESOURCE_EXHAUSTED`). |

Both paths delegate to the same `Export()` methods — zero business logic duplication. By default `Export()` parses the OTLP request and hands a `Batch` to the async ingest `Pipeline` (`internal/ingest/pipeline.go`); a worker pool persists Trace→Span→Log in order. With `INGEST_ASYNC_ENABLED=false` the pipeline is bypassed and `Export()` writes inline (legacy path).

Expand Down
2 changes: 1 addition & 1 deletion docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft
- `VECTOR_INDEX_MAX_ENTRIES=100000`
- `SAMPLING_*` (defaults keep 100% + always-on errors)
- `GRAPHRAG_WORKER_COUNT=16`, `GRAPHRAG_EVENT_QUEUE_SIZE=100000` — sized for 100–200 services. Lower for tiny deployments; raise further if `graphrag_events_dropped_total` climbs.
- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}` and `otelcontext_ingest_pipeline_queue_depth{signal}`.
- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` (HTTP `429 Too Many Requests` + `Retry-After: 1` on the OTLP HTTP receiver) at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}`, `otelcontext_ingest_pipeline_queue_depth{signal}`, and `otelcontext_http_otlp_throttled_total{signal}`.
- `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps
- `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs

Expand Down
61 changes: 61 additions & 0 deletions internal/ingest/otlp_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@ import (
"io"
"log/slog"
"net/http"
"strconv"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// defaultRetryAfterSeconds is the Retry-After value advertised when the
// pipeline queue is full and the HTTP handler returns 429. Mirrors the
// gRPC RESOURCE_EXHAUSTED back-off semantics from Phase 1 — short enough
// that healthy clients recover quickly, long enough to give a stuck
// downstream a chance to drain.
const defaultRetryAfterSeconds = 1

// withTenantFromHTTP attaches a tenant ID from the X-Tenant-ID header (if any)
// to the request context before delegating to the gRPC Export methods.
// Uses the shared storage.WithTenantContext helper so ingest and read paths
Expand Down Expand Up @@ -47,6 +57,11 @@ type HTTPHandler struct {
metrics *MetricsServer
maxBodyBytes int64 // pre-decompress wire size cap
maxDecompressedBytes int64 // post-gzip decompressed size cap (zip-bomb guard)

// onThrottle is invoked once per signal type (traces|logs|metrics) every
// time the async ingest pipeline returns RESOURCE_EXHAUSTED and the HTTP
// handler maps it to 429. nil-safe.
onThrottle func(signal string)
}

// NewHTTPHandler creates an HTTP OTLP handler wrapping the existing gRPC servers.
Expand Down Expand Up @@ -75,6 +90,40 @@ func (h *HTTPHandler) SetMaxDecompressedBytes(n int64) {
}
}

// SetThrottleCallback wires a per-signal counter that increments every time a
// 429 is returned because the async ingest pipeline is at capacity. Used by
// main.go to feed `otelcontext_http_otlp_throttled_total{signal=…}`.
func (h *HTTPHandler) SetThrottleCallback(fn func(signal string)) {
h.onThrottle = fn
}

// isQueueFull reports whether the error returned by an Export() method is
// the gRPC RESOURCE_EXHAUSTED status used by the async pipeline to signal
// "queue at capacity". Used by the HTTP handlers to map back to 429.
func isQueueFull(err error) bool {
if err == nil {
return false
}
if errors.Is(err, ErrQueueFull) {
return true
}
if s, ok := grpcstatus.FromError(err); ok && s.Code() == codes.ResourceExhausted {
return true
}
return false
}

// writeThrottled emits an OTLP-shaped 429 with a Retry-After header. The
// Retry-After value is duplicated in the protobuf Status message so clients
// that don't read headers (some custom OTLP shims) still see it.
func (h *HTTPHandler) writeThrottled(w http.ResponseWriter, signal string) {
if h.onThrottle != nil {
h.onThrottle(signal)
}
w.Header().Set("Retry-After", strconv.Itoa(defaultRetryAfterSeconds))
writeOTLPError(w, http.StatusTooManyRequests, fmt.Sprintf("ingest pipeline at capacity, retry after %ds", defaultRetryAfterSeconds))
}

// RegisterRoutes registers the HTTP OTLP endpoints on the given mux.
func (h *HTTPHandler) RegisterRoutes(mux *http.ServeMux) {
mux.HandleFunc("POST /v1/traces", h.handleTraces)
Expand All @@ -101,6 +150,10 @@ func (h *HTTPHandler) handleTraces(w http.ResponseWriter, r *http.Request) {

resp, err := h.traces.Export(withTenantFromHTTP(r), req)
if err != nil {
if isQueueFull(err) {
h.writeThrottled(w, "traces")
return
}
slog.Error("HTTP OTLP traces export failed", "error", err)
writeOTLPError(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -128,6 +181,10 @@ func (h *HTTPHandler) handleLogs(w http.ResponseWriter, r *http.Request) {

resp, err := h.logs.Export(withTenantFromHTTP(r), req)
if err != nil {
if isQueueFull(err) {
h.writeThrottled(w, "logs")
return
}
slog.Error("HTTP OTLP logs export failed", "error", err)
writeOTLPError(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -155,6 +212,10 @@ func (h *HTTPHandler) handleMetrics(w http.ResponseWriter, r *http.Request) {

resp, err := h.metrics.Export(withTenantFromHTTP(r), req)
if err != nil {
if isQueueFull(err) {
h.writeThrottled(w, "metrics")
return
}
slog.Error("HTTP OTLP metrics export failed", "error", err)
writeOTLPError(w, http.StatusInternalServerError, err.Error())
return
Expand Down
Loading
Loading