Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ HTTP :8080 ◄── REST API ◄───────────┘
| gRPC | `:4317` | protobuf | Traces, Logs, Metrics via OTLP gRPC |
| HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit |

Both paths delegate to the same `Export()` methods — zero business logic duplication.
Both paths delegate to the same `Export()` methods — zero business logic duplication. By default `Export()` parses the OTLP request and hands a `Batch` to the async ingest `Pipeline` (`internal/ingest/pipeline.go`); a worker pool persists Trace→Span→Log in order. With `INGEST_ASYNC_ENABLED=false` the pipeline is bypassed and `Export()` writes inline (legacy path).

### Multi-tenancy

Expand Down Expand Up @@ -215,6 +215,7 @@ Key settings in `internal/config/config.go`:
- `VECTOR_INDEX_MAX_ENTRIES` (100000)
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
- `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs
- `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`.
- `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000
- `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs
- `APP_ENV` (`"development"`), `OTELCONTEXT_ALLOW_SQLITE_PROD` (false) — SQLite is refused when `APP_ENV=production` unless the allow flag is set
Expand Down
4 changes: 4 additions & 0 deletions docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft
- `VECTOR_INDEX_MAX_ENTRIES=100000`
- `SAMPLING_*` (defaults keep 100% + always-on errors)
- `GRAPHRAG_WORKER_COUNT=16`, `GRAPHRAG_EVENT_QUEUE_SIZE=100000` — sized for 100–200 services. Lower for tiny deployments; raise further if `graphrag_events_dropped_total` climbs.
- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}` and `otelcontext_ingest_pipeline_queue_depth{signal}`.
- `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps
- `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs

Expand Down Expand Up @@ -197,6 +198,9 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i
- `rate(OtelContext_otlp_payload_rejected_total[5m]) > 0`
- `rate(OtelContext_dlq_replay_failure_total[5m]) > rate(OtelContext_dlq_replay_success_total[5m])`
- `rate(otelcontext_graphrag_events_dropped_total[5m]) > 0` — ingestion channel saturated; bump `GRAPHRAG_WORKER_COUNT` or `GRAPHRAG_EVENT_QUEUE_SIZE`
- `rate(otelcontext_ingest_pipeline_dropped_total{reason="queue_full"}[5m]) > 0` — clients are getting `RESOURCE_EXHAUSTED`; raise `INGEST_PIPELINE_QUEUE_SIZE` or `INGEST_PIPELINE_WORKERS`. Sustained drops mean the DB cannot keep up with the ingest rate.
- `rate(otelcontext_ingest_pipeline_dropped_total{reason="soft_backpressure"}[5m]) > 0` — pipeline is actively shedding healthy traces; check downstream DB latency or scale workers/queue.
- `otelcontext_ingest_pipeline_queue_depth / INGEST_PIPELINE_QUEUE_SIZE > 0.7` for >5m — queue trending toward soft drop; capacity is becoming a constraint.
- `otelcontext_retention_rows_behind > 1_000_000` — purge is falling behind; tune `RETENTION_BATCH_SIZE` / `RETENTION_BATCH_SLEEP_MS`
- `otelcontext_db_pool_in_use / otelcontext_db_pool_max_open > 0.9` — pool exhausted; raise `DB_MAX_OPEN_CONNS`
- `rate(otelcontext_dlq_evicted_total[5m]) > 0` — DLQ is actively dropping entries at cap; replay target is down or slow
Expand Down
17 changes: 17 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ type Config struct {
// GraphRAG event channel buffer size. Defaults to 10000 if unset or <=0.
GraphRAGEventQueueSize int

// Async ingest pipeline (Phase 1 robustness work). Decouples OTLP Export
// from synchronous DB writes. When enabled, Export() returns as soon as
// the parsed batch is enqueued; persistence runs on a worker pool.
//
// Backpressure is hybrid:
// <90% queue — accept all
// 90%-100% queue — drop healthy batches (silent), errors/slow always pass
// 100% queue — return RESOURCE_EXHAUSTED so OTLP clients back off
IngestAsyncEnabled bool // default true; opt out via INGEST_ASYNC_ENABLED=false
IngestPipelineQueueSize int // default 50000 batches; per-deployment tunable
IngestPipelineWorkers int // default 8 worker goroutines

// TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers.
// Empty values (default) keep plaintext behavior.
TLSCertFile string
Expand Down Expand Up @@ -205,6 +217,11 @@ func Load(customPath string) (*Config, error) {
GraphRAGWorkerCount: getEnvInt("GRAPHRAG_WORKER_COUNT", 16),
GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 100000),

// Async ingest pipeline
IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true),
IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000),
IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8),

// TLS
TLSCertFile: getEnv("TLS_CERT_FILE", ""),
TLSKeyFile: getEnv("TLS_KEY_FILE", ""),
Expand Down
151 changes: 129 additions & 22 deletions internal/ingest/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingest
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
Expand All @@ -21,7 +22,9 @@ import (
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)

// tenantHeader is the canonical HTTP / gRPC metadata key used to override the
Expand Down Expand Up @@ -99,7 +102,9 @@ type TraceServer struct {
minSeverity int
allowedServices map[string]bool
excludedServices map[string]bool
sampler *Sampler // nil = no sampling (keep all)
sampler *Sampler // nil = no sampling (keep all)
pipeline *Pipeline // nil = synchronous DB writes (legacy path)
latencyThresholdMs float64 // spans slower than this are flagged HasSlow for the pipeline
defaultTenant string
trustResourceTenant bool
coltracepb.UnimplementedTraceServiceServer
Expand All @@ -112,6 +117,7 @@ type LogsServer struct {
minSeverity int
allowedServices map[string]bool
excludedServices map[string]bool
pipeline *Pipeline // nil = synchronous DB writes (legacy path)
defaultTenant string
trustResourceTenant bool
collogspb.UnimplementedLogsServiceServer
Expand All @@ -136,6 +142,7 @@ func NewTraceServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *c
minSeverity: parseSeverity(cfg.IngestMinSeverity),
allowedServices: parseServiceList(cfg.IngestAllowedServices),
excludedServices: parseServiceList(cfg.IngestExcludedServices),
latencyThresholdMs: float64(cfg.SamplingLatencyThresholdMs),
defaultTenant: cfg.DefaultTenant,
trustResourceTenant: cfg.OTLPTrustResourceTenant,
}
Expand All @@ -156,6 +163,20 @@ func (s *TraceServer) SetSampler(sm *Sampler) {
s.sampler = sm
}

// SetPipeline enables the async ingest pipeline. When set, Export()
// returns to the caller as soon as the parsed batch is enqueued (or
// rejected), and persistence runs on the pipeline's worker pool. Pass
// nil to revert to the synchronous DB-write path.
func (s *TraceServer) SetPipeline(p *Pipeline) {
s.pipeline = p
}

// SetPipeline enables the async ingest pipeline for log export. Same
// semantics as TraceServer.SetPipeline.
func (s *LogsServer) SetPipeline(p *Pipeline) {
s.pipeline = p
}

func NewLogsServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *LogsServer {
return &LogsServer{
repo: repo,
Expand Down Expand Up @@ -265,9 +286,11 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
slog.Debug("📥 [TRACES] Received Request", "resource_spans", len(req.ResourceSpans))

type batchResult struct {
spans []storage.Span
traces []storage.Trace
logs []storage.Log
spans []storage.Span
traces []storage.Trace
logs []storage.Log
hasErr bool // any span in this slice had STATUS_CODE_ERROR
hasSlow bool // any span exceeded latencyThresholdMs
}

results := make([]batchResult, len(req.ResourceSpans))
Expand All @@ -289,6 +312,7 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
localSpans := make([]storage.Span, 0)
localTraces := make([]storage.Trace, 0)
localLogs := make([]storage.Log, 0)
var localHasErr, localHasSlow bool

for _, scopeSpans := range resourceSpans.ScopeSpans {
for _, span := range scopeSpans.Spans {
Expand Down Expand Up @@ -327,6 +351,16 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
}
localSpans = append(localSpans, sModel)

// Flag the batch for the async pipeline's priority lane.
// Errors and slow spans bypass soft-backpressure drops so
// diagnostic data is never silently lost at >=90% queue.
if statusStr == "STATUS_CODE_ERROR" {
localHasErr = true
}
if s.latencyThresholdMs > 0 && float64(duration)/1000.0 >= s.latencyThresholdMs {
localHasSlow = true
}

tModel := storage.Trace{
TenantID: tenantID,
TraceID: fmt.Sprintf("%x", span.TraceId),
Expand Down Expand Up @@ -403,7 +437,13 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
}

// Store results in pre-allocated slot (no mutex needed)
results[idx] = batchResult{spans: localSpans, traces: localTraces, logs: localLogs}
results[idx] = batchResult{
spans: localSpans,
traces: localTraces,
logs: localLogs,
hasErr: localHasErr,
hasSlow: localHasSlow,
}

return nil
})
Expand All @@ -415,12 +455,55 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
var spansToInsert []storage.Span
var tracesToUpsert []storage.Trace
var synthesizedLogs []storage.Log
var batchHasErr, batchHasSlow bool
for _, r := range results {
spansToInsert = append(spansToInsert, r.spans...)
tracesToUpsert = append(tracesToUpsert, r.traces...)
synthesizedLogs = append(synthesizedLogs, r.logs...)
if r.hasErr {
batchHasErr = true
}
if r.hasSlow {
batchHasSlow = true
}
}

// Intake metrics fire before the persist decision so operators see
// what was received regardless of async drops/rejections. Net
// persisted = ingestion_total - ingest_pipeline_dropped_total.
if s.metrics != nil && len(spansToInsert) > 0 {
s.metrics.GRPCBatchSize.Observe(float64(len(spansToInsert)))
s.metrics.RecordIngestion(len(spansToInsert))
}

// Async path: hand off to the pipeline. ErrQueueFull is the only
// signal we need to surface to the OTLP client — translates to
// gRPC RESOURCE_EXHAUSTED so the client backs off rather than
// retrying tighter. Soft backpressure drops are silent.
if s.pipeline != nil {
batch := &Batch{
Type: SignalTraces,
Traces: tracesToUpsert,
Spans: spansToInsert,
Logs: synthesizedLogs,
HasError: batchHasErr,
HasSlow: batchHasSlow,
SpanCallback: s.spanCallback,
LogCallback: s.logCallback,
}
if err := s.pipeline.Submit(batch); err != nil {
if errors.Is(err, ErrQueueFull) {
return nil, grpcstatus.Errorf(codes.ResourceExhausted, "ingest pipeline at capacity")
}
return nil, err
}
return &coltracepb.ExportTraceServiceResponse{}, nil
}

// Synchronous fallback (s.pipeline == nil). Preserves the original
// behavior bit-for-bit — no async-related side effects when the
// operator opts out via INGEST_ASYNC_ENABLED=false.

// Persist - CRITICAL ORDER: Traces MUST be inserted before Spans due to FK
if len(tracesToUpsert) > 0 {
if err := s.repo.BatchCreateTraces(tracesToUpsert); err != nil {
Expand All @@ -430,16 +513,10 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
}

if len(spansToInsert) > 0 {
if s.metrics != nil {
s.metrics.GRPCBatchSize.Observe(float64(len(spansToInsert)))
}
if err := s.repo.BatchCreateSpans(spansToInsert); err != nil {
slog.Error("❌ Failed to insert spans", "error", err)
return nil, err
}
if s.metrics != nil {
s.metrics.RecordIngestion(len(spansToInsert))
}
// Notify GraphRAG of persisted spans
if s.spanCallback != nil {
for _, span := range spansToInsert {
Expand Down Expand Up @@ -532,20 +609,50 @@ func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServic
logsToInsert = append(logsToInsert, lr...)
}

if len(logsToInsert) > 0 {
if err := s.repo.BatchCreateLogs(logsToInsert); err != nil {
slog.Error("❌ Failed to insert logs", "error", err)
return nil, err
}
if s.metrics != nil {
s.metrics.RecordIngestion(len(logsToInsert))
if len(logsToInsert) == 0 {
return &collogspb.ExportLogsServiceResponse{}, nil
}

// Intake metric fires before the persist decision (see TraceServer.Export
// rationale). Net persisted = ingestion_total - ingest_pipeline_dropped_total.
if s.metrics != nil {
s.metrics.RecordIngestion(len(logsToInsert))
}

// Detect priority logs — ERROR/FATAL must bypass soft backpressure.
var hasErr bool
for _, l := range logsToInsert {
if l.Severity == "ERROR" || l.Severity == "FATAL" {
hasErr = true
break
}
}

// Notify listener
if s.logCallback != nil {
for _, l := range logsToInsert {
s.logCallback(l)
// Async path: hand off to the pipeline.
if s.pipeline != nil {
batch := &Batch{
Type: SignalLogs,
Logs: logsToInsert,
HasError: hasErr,
LogCallback: s.logCallback,
}
if err := s.pipeline.Submit(batch); err != nil {
if errors.Is(err, ErrQueueFull) {
return nil, grpcstatus.Errorf(codes.ResourceExhausted, "ingest pipeline at capacity")
}
return nil, err
}
return &collogspb.ExportLogsServiceResponse{}, nil
}

// Synchronous fallback (preserves original behavior when async is disabled).
if err := s.repo.BatchCreateLogs(logsToInsert); err != nil {
slog.Error("❌ Failed to insert logs", "error", err)
return nil, err
}
if s.logCallback != nil {
for _, l := range logsToInsert {
s.logCallback(l)
}
}

Expand Down
Loading
Loading