feat(ingest): async ingest pipeline with hybrid backpressure#50
Merged
Conversation
Introduces the Pipeline type that decouples OTLP Export() from
synchronous DB writes. Builds the foundation for Phase 1 of the
robustness work — wiring into TraceServer/LogsServer.Export comes in
the next commit.
Backpressure policy is hybrid:
- <90% queue fullness → all batches enqueue
- 90%-100% fullness → healthy batches dropped (silent), priority
(errors / slow traces) always enqueue
- 100% (channel full) → ErrQueueFull returned to the caller, even
for priority batches; callers map this to
gRPC RESOURCE_EXHAUSTED / HTTP 429 so the
client backs off rather than retrying tighter
The unit of work is a Batch — one OTLP Export call's persistable output
packaged together. This preserves the Trace→Span→Log FK ordering the
synchronous path enforces, and lets the worker run the same insert
sequence without rewriting trace upsert logic.
Two new Prometheus instruments surface the new behavior:
- otelcontext_ingest_pipeline_queue_depth{signal} — gauge
- otelcontext_ingest_pipeline_dropped_total{signal,reason} — counter
where reason ∈ {soft_backpressure, queue_full}
Test coverage includes nil/empty batch handling, soft-threshold drop,
priority bypass, hard-capacity rejection, FK ordering, callback
sequencing, partial-failure isolation (failed spans skip logs, failed
traces continue to spans), graceful drain on Stop, idempotent Stop,
race-detector-safe concurrent submit, default-config fallback, and
panic recovery in callbacks. All 14 pipeline tests pass under -race;
full suite (12 packages) green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plumbs the Pipeline introduced in the prior commit into TraceServer.Export
and LogsServer.Export. Default behavior is async; INGEST_ASYNC_ENABLED=false
falls back to the legacy synchronous DB-write path bit-for-bit so operators
have a kill switch.
Wiring details:
- TraceServer / LogsServer gain a *Pipeline field and SetPipeline() setter,
matching the existing SetSampler / SetCallback pattern.
- The parser tracks per-batch HasError / HasSlow flags during the existing
per-resource goroutine fan-out; the merge step ORs them across goroutines
so the pipeline's priority lane sees the right protection bit.
- Intake metrics (GRPCBatchSize, IngestionRate) fire BEFORE the persist
decision so dashboards reflect what was received. Net persisted is
derivable as ingestion_total - ingest_pipeline_dropped_total.
- On ErrQueueFull, Export returns gRPC RESOURCE_EXHAUSTED via google.golang.org/grpc/status,
which OTLP clients map to backoff retries (not tighter loops).
Three new env vars under "Async ingest pipeline":
- INGEST_ASYNC_ENABLED=true (default)
- INGEST_PIPELINE_QUEUE_SIZE=50000 (default)
- INGEST_PIPELINE_WORKERS=8 (default)
main.go: pipeline is constructed when enabled, started with context.Background
(workers exit only via Stop() drain), and Stop()'d after gRPC GracefulStop but
before DLQ.Stop in the shutdown LIFO so in-flight batches drain to the DB
before the DLQ and DB shut down.
E2E coverage adds 4 tests (pipeline_e2e_test.go) running against an in-memory
SQLite Repository:
- traces persist through the async path
- logs persist through the async path
- hard-capacity overflow returns gRPC codes.ResourceExhausted
- priority batches (error spans) bypass soft backpressure end-to-end
Docs: CLAUDE.md ingestion paths section now mentions the pipeline; the env-var
list covers all three new tunables. OPERATIONS.md adds three new alert rules
covering soft drops, hard rejections, and queue-depth headroom.
All 12 packages pass under -race; pipeline tests now total 19 (15 unit + 4 e2e)
and cover nil/empty batches, soft/hard backpressure, FK ordering, callback
sequencing, partial-failure isolation, drain-on-Stop, idempotent Stop,
concurrent submit, default-config fallback, callback panic recovery, and
gRPC status code mapping.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Summary
Phase 1 of the 150–200 component robustness push. Decouples OTLP
Export()from synchronous DB writes via a bounded async pipeline, making the ingest path resilient under burst load.internal/ingest/pipeline.go— bounded queue + worker pool with hybrid backpressure:RESOURCE_EXHAUSTED/ HTTP 429 so OTLP clients back off cleanlyotelcontext_ingest_pipeline_queue_depth{signal}andotelcontext_ingest_pipeline_dropped_total{signal,reason}INGEST_ASYNC_ENABLED(defaulttrue),INGEST_PIPELINE_QUEUE_SIZE(default50000),INGEST_PIPELINE_WORKERS(default8)INGEST_ASYNC_ENABLED=falseis the kill switch — reverts to the legacy synchronous write path insideExport()bit-for-bitArchitecture preserved
Batchis the unit of work; the worker runs the same insert sequence the sync path used)GracefulStop→ pipelineStop(drains) → DLQ → retention → DB closeTest plan
go build ./...cleango vet ./...cleango test -race ./...— all 12 packages pass-racecovering: nil/empty batch, soft-threshold drop, priority bypass, hard-capacity rejection, FK ordering, callback sequencing, failed-spans-skip-logs, failed-traces-continue-to-spans, drain-on-Stop, idempotent Stop, concurrent submit, default-config fallback, callback panic recoverycodes.ResourceExhausted, priority batches bypass soft backpressure end-to-endTestOTLPHTTPEndToEnd) continue to pass — confirms the legacy synchronous fallback is intactBackwards compatibility
INGEST_ASYNC_ENABLED=falseGRPCBatchSize,IngestionRate) fire on receipt rather than on persist, so net-persisted is computed asingestion_total - ingest_pipeline_dropped_total. Documented indocs/OPERATIONS.md.Follow-ups (separate PRs per phase plan)
🤖 Generated with Claude Code