diff --git a/CLAUDE.md b/CLAUDE.md index 003e2e1..dfd6e38 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,7 +41,7 @@ HTTP :8080 ◄── REST API ◄───────────┘ | gRPC | `:4317` | protobuf | Traces, Logs, Metrics via OTLP gRPC | | HTTP | `/v1/traces`, `/v1/logs`, `/v1/metrics` | `application/x-protobuf`, `application/json` | OTLP HTTP spec compliant, gzip support, 4MB limit | -Both paths delegate to the same `Export()` methods — zero business logic duplication. +Both paths delegate to the same `Export()` methods — zero business logic duplication. By default `Export()` parses the OTLP request and hands a `Batch` to the async ingest `Pipeline` (`internal/ingest/pipeline.go`); a worker pool persists Trace→Span→Log in order. With `INGEST_ASYNC_ENABLED=false` the pipeline is bypassed and `Export()` writes inline (legacy path). ### Multi-tenancy @@ -215,6 +215,7 @@ Key settings in `internal/config/config.go`: - `VECTOR_INDEX_MAX_ENTRIES` (100000) - `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10) - `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs +- `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`. - `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000 - `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs - `APP_ENV` (`"development"`), `OTELCONTEXT_ALLOW_SQLITE_PROD` (false) — SQLite is refused when `APP_ENV=production` unless the allow flag is set diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index a04e961..895eb16 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -93,6 +93,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft - `VECTOR_INDEX_MAX_ENTRIES=100000` - `SAMPLING_*` (defaults keep 100% + always-on errors) - `GRAPHRAG_WORKER_COUNT=16`, `GRAPHRAG_EVENT_QUEUE_SIZE=100000` — sized for 100–200 services. Lower for tiny deployments; raise further if `graphrag_events_dropped_total` climbs. +- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}` and `otelcontext_ingest_pipeline_queue_depth{signal}`. - `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps - `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs @@ -197,6 +198,9 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i - `rate(OtelContext_otlp_payload_rejected_total[5m]) > 0` - `rate(OtelContext_dlq_replay_failure_total[5m]) > rate(OtelContext_dlq_replay_success_total[5m])` - `rate(otelcontext_graphrag_events_dropped_total[5m]) > 0` — ingestion channel saturated; bump `GRAPHRAG_WORKER_COUNT` or `GRAPHRAG_EVENT_QUEUE_SIZE` + - `rate(otelcontext_ingest_pipeline_dropped_total{reason="queue_full"}[5m]) > 0` — clients are getting `RESOURCE_EXHAUSTED`; raise `INGEST_PIPELINE_QUEUE_SIZE` or `INGEST_PIPELINE_WORKERS`. Sustained drops mean the DB cannot keep up with the ingest rate. + - `rate(otelcontext_ingest_pipeline_dropped_total{reason="soft_backpressure"}[5m]) > 0` — pipeline is actively shedding healthy traces; check downstream DB latency or scale workers/queue. + - `otelcontext_ingest_pipeline_queue_depth / INGEST_PIPELINE_QUEUE_SIZE > 0.7` for >5m — queue trending toward soft drop; capacity is becoming a constraint. - `otelcontext_retention_rows_behind > 1_000_000` — purge is falling behind; tune `RETENTION_BATCH_SIZE` / `RETENTION_BATCH_SLEEP_MS` - `otelcontext_db_pool_in_use / otelcontext_db_pool_max_open > 0.9` — pool exhausted; raise `DB_MAX_OPEN_CONNS` - `rate(otelcontext_dlq_evicted_total[5m]) > 0` — DLQ is actively dropping entries at cap; replay target is down or slow diff --git a/internal/config/config.go b/internal/config/config.go index 6a8f760..f4d5bb7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -76,6 +76,18 @@ type Config struct { // GraphRAG event channel buffer size. Defaults to 10000 if unset or <=0. GraphRAGEventQueueSize int + // Async ingest pipeline (Phase 1 robustness work). Decouples OTLP Export + // from synchronous DB writes. When enabled, Export() returns as soon as + // the parsed batch is enqueued; persistence runs on a worker pool. + // + // Backpressure is hybrid: + // <90% queue — accept all + // 90%-100% queue — drop healthy batches (silent), errors/slow always pass + // 100% queue — return RESOURCE_EXHAUSTED so OTLP clients back off + IngestAsyncEnabled bool // default true; opt out via INGEST_ASYNC_ENABLED=false + IngestPipelineQueueSize int // default 50000 batches; per-deployment tunable + IngestPipelineWorkers int // default 8 worker goroutines + // TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers. // Empty values (default) keep plaintext behavior. TLSCertFile string @@ -205,6 +217,11 @@ func Load(customPath string) (*Config, error) { GraphRAGWorkerCount: getEnvInt("GRAPHRAG_WORKER_COUNT", 16), GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 100000), + // Async ingest pipeline + IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true), + IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000), + IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8), + // TLS TLSCertFile: getEnv("TLS_CERT_FILE", ""), TLSKeyFile: getEnv("TLS_KEY_FILE", ""), diff --git a/internal/ingest/otlp.go b/internal/ingest/otlp.go index a316e69..6048c22 100644 --- a/internal/ingest/otlp.go +++ b/internal/ingest/otlp.go @@ -3,6 +3,7 @@ package ingest import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "strings" @@ -21,7 +22,9 @@ import ( metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" ) // tenantHeader is the canonical HTTP / gRPC metadata key used to override the @@ -99,7 +102,9 @@ type TraceServer struct { minSeverity int allowedServices map[string]bool excludedServices map[string]bool - sampler *Sampler // nil = no sampling (keep all) + sampler *Sampler // nil = no sampling (keep all) + pipeline *Pipeline // nil = synchronous DB writes (legacy path) + latencyThresholdMs float64 // spans slower than this are flagged HasSlow for the pipeline defaultTenant string trustResourceTenant bool coltracepb.UnimplementedTraceServiceServer @@ -112,6 +117,7 @@ type LogsServer struct { minSeverity int allowedServices map[string]bool excludedServices map[string]bool + pipeline *Pipeline // nil = synchronous DB writes (legacy path) defaultTenant string trustResourceTenant bool collogspb.UnimplementedLogsServiceServer @@ -136,6 +142,7 @@ func NewTraceServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *c minSeverity: parseSeverity(cfg.IngestMinSeverity), allowedServices: parseServiceList(cfg.IngestAllowedServices), excludedServices: parseServiceList(cfg.IngestExcludedServices), + latencyThresholdMs: float64(cfg.SamplingLatencyThresholdMs), defaultTenant: cfg.DefaultTenant, trustResourceTenant: cfg.OTLPTrustResourceTenant, } @@ -156,6 +163,20 @@ func (s *TraceServer) SetSampler(sm *Sampler) { s.sampler = sm } +// SetPipeline enables the async ingest pipeline. When set, Export() +// returns to the caller as soon as the parsed batch is enqueued (or +// rejected), and persistence runs on the pipeline's worker pool. Pass +// nil to revert to the synchronous DB-write path. +func (s *TraceServer) SetPipeline(p *Pipeline) { + s.pipeline = p +} + +// SetPipeline enables the async ingest pipeline for log export. Same +// semantics as TraceServer.SetPipeline. +func (s *LogsServer) SetPipeline(p *Pipeline) { + s.pipeline = p +} + func NewLogsServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *LogsServer { return &LogsServer{ repo: repo, @@ -265,9 +286,11 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer slog.Debug("📥 [TRACES] Received Request", "resource_spans", len(req.ResourceSpans)) type batchResult struct { - spans []storage.Span - traces []storage.Trace - logs []storage.Log + spans []storage.Span + traces []storage.Trace + logs []storage.Log + hasErr bool // any span in this slice had STATUS_CODE_ERROR + hasSlow bool // any span exceeded latencyThresholdMs } results := make([]batchResult, len(req.ResourceSpans)) @@ -289,6 +312,7 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer localSpans := make([]storage.Span, 0) localTraces := make([]storage.Trace, 0) localLogs := make([]storage.Log, 0) + var localHasErr, localHasSlow bool for _, scopeSpans := range resourceSpans.ScopeSpans { for _, span := range scopeSpans.Spans { @@ -327,6 +351,16 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer } localSpans = append(localSpans, sModel) + // Flag the batch for the async pipeline's priority lane. + // Errors and slow spans bypass soft-backpressure drops so + // diagnostic data is never silently lost at >=90% queue. + if statusStr == "STATUS_CODE_ERROR" { + localHasErr = true + } + if s.latencyThresholdMs > 0 && float64(duration)/1000.0 >= s.latencyThresholdMs { + localHasSlow = true + } + tModel := storage.Trace{ TenantID: tenantID, TraceID: fmt.Sprintf("%x", span.TraceId), @@ -403,7 +437,13 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer } // Store results in pre-allocated slot (no mutex needed) - results[idx] = batchResult{spans: localSpans, traces: localTraces, logs: localLogs} + results[idx] = batchResult{ + spans: localSpans, + traces: localTraces, + logs: localLogs, + hasErr: localHasErr, + hasSlow: localHasSlow, + } return nil }) @@ -415,12 +455,55 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer var spansToInsert []storage.Span var tracesToUpsert []storage.Trace var synthesizedLogs []storage.Log + var batchHasErr, batchHasSlow bool for _, r := range results { spansToInsert = append(spansToInsert, r.spans...) tracesToUpsert = append(tracesToUpsert, r.traces...) synthesizedLogs = append(synthesizedLogs, r.logs...) + if r.hasErr { + batchHasErr = true + } + if r.hasSlow { + batchHasSlow = true + } } + // Intake metrics fire before the persist decision so operators see + // what was received regardless of async drops/rejections. Net + // persisted = ingestion_total - ingest_pipeline_dropped_total. + if s.metrics != nil && len(spansToInsert) > 0 { + s.metrics.GRPCBatchSize.Observe(float64(len(spansToInsert))) + s.metrics.RecordIngestion(len(spansToInsert)) + } + + // Async path: hand off to the pipeline. ErrQueueFull is the only + // signal we need to surface to the OTLP client — translates to + // gRPC RESOURCE_EXHAUSTED so the client backs off rather than + // retrying tighter. Soft backpressure drops are silent. + if s.pipeline != nil { + batch := &Batch{ + Type: SignalTraces, + Traces: tracesToUpsert, + Spans: spansToInsert, + Logs: synthesizedLogs, + HasError: batchHasErr, + HasSlow: batchHasSlow, + SpanCallback: s.spanCallback, + LogCallback: s.logCallback, + } + if err := s.pipeline.Submit(batch); err != nil { + if errors.Is(err, ErrQueueFull) { + return nil, grpcstatus.Errorf(codes.ResourceExhausted, "ingest pipeline at capacity") + } + return nil, err + } + return &coltracepb.ExportTraceServiceResponse{}, nil + } + + // Synchronous fallback (s.pipeline == nil). Preserves the original + // behavior bit-for-bit — no async-related side effects when the + // operator opts out via INGEST_ASYNC_ENABLED=false. + // Persist - CRITICAL ORDER: Traces MUST be inserted before Spans due to FK if len(tracesToUpsert) > 0 { if err := s.repo.BatchCreateTraces(tracesToUpsert); err != nil { @@ -430,16 +513,10 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer } if len(spansToInsert) > 0 { - if s.metrics != nil { - s.metrics.GRPCBatchSize.Observe(float64(len(spansToInsert))) - } if err := s.repo.BatchCreateSpans(spansToInsert); err != nil { slog.Error("❌ Failed to insert spans", "error", err) return nil, err } - if s.metrics != nil { - s.metrics.RecordIngestion(len(spansToInsert)) - } // Notify GraphRAG of persisted spans if s.spanCallback != nil { for _, span := range spansToInsert { @@ -532,20 +609,50 @@ func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServic logsToInsert = append(logsToInsert, lr...) } - if len(logsToInsert) > 0 { - if err := s.repo.BatchCreateLogs(logsToInsert); err != nil { - slog.Error("❌ Failed to insert logs", "error", err) - return nil, err - } - if s.metrics != nil { - s.metrics.RecordIngestion(len(logsToInsert)) + if len(logsToInsert) == 0 { + return &collogspb.ExportLogsServiceResponse{}, nil + } + + // Intake metric fires before the persist decision (see TraceServer.Export + // rationale). Net persisted = ingestion_total - ingest_pipeline_dropped_total. + if s.metrics != nil { + s.metrics.RecordIngestion(len(logsToInsert)) + } + + // Detect priority logs — ERROR/FATAL must bypass soft backpressure. + var hasErr bool + for _, l := range logsToInsert { + if l.Severity == "ERROR" || l.Severity == "FATAL" { + hasErr = true + break } + } - // Notify listener - if s.logCallback != nil { - for _, l := range logsToInsert { - s.logCallback(l) + // Async path: hand off to the pipeline. + if s.pipeline != nil { + batch := &Batch{ + Type: SignalLogs, + Logs: logsToInsert, + HasError: hasErr, + LogCallback: s.logCallback, + } + if err := s.pipeline.Submit(batch); err != nil { + if errors.Is(err, ErrQueueFull) { + return nil, grpcstatus.Errorf(codes.ResourceExhausted, "ingest pipeline at capacity") } + return nil, err + } + return &collogspb.ExportLogsServiceResponse{}, nil + } + + // Synchronous fallback (preserves original behavior when async is disabled). + if err := s.repo.BatchCreateLogs(logsToInsert); err != nil { + slog.Error("❌ Failed to insert logs", "error", err) + return nil, err + } + if s.logCallback != nil { + for _, l := range logsToInsert { + s.logCallback(l) } } diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go new file mode 100644 index 0000000..f9366a1 --- /dev/null +++ b/internal/ingest/pipeline.go @@ -0,0 +1,344 @@ +package ingest + +import ( + "context" + "errors" + "log/slog" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/telemetry" +) + +// SignalType identifies the OTLP signal a Batch carries. The label +// is exported on pipeline metrics so operators can attribute drops. +type SignalType uint8 + +const ( + SignalTraces SignalType = iota + SignalLogs +) + +// signalLabel returns the metric-label form of a SignalType. +func signalLabel(t SignalType) string { + switch t { + case SignalTraces: + return "traces" + case SignalLogs: + return "logs" + default: + return "unknown" + } +} + +// Batch is the unit of work flowing through the async ingest Pipeline. +// One Batch corresponds to the persistable output of a single OTLP +// Export() call. Trace insertion ordering (Traces → Spans → Logs) is +// honored by the worker that processes the batch — packaging the three +// slices together preserves the FK invariant the synchronous path +// already enforces. +type Batch struct { + Type SignalType + Tenant string + + Traces []storage.Trace + Spans []storage.Span + Logs []storage.Log + + // Priority flags. Errors and slow traces are protected from soft + // backpressure drops — they may still be rejected at hard capacity. + HasError bool + HasSlow bool + + // Optional per-record callbacks invoked after a successful DB write. + // In production these feed GraphRAG ingestion. Nil callbacks are + // skipped silently. + SpanCallback func(storage.Span) + LogCallback func(storage.Log) + + enqueuedAt time.Time +} + +// Priority reports whether the batch is protected from soft-backpressure +// drops. Used by Submit() to decide whether to enqueue at >= 90% fullness. +func (b *Batch) Priority() bool { return b.HasError || b.HasSlow } + +// ErrQueueFull is returned by Submit when the queue is at hard capacity +// (100% full). Callers should map this to gRPC RESOURCE_EXHAUSTED or +// HTTP 429 with a Retry-After hint so OTLP clients back off cleanly. +var ErrQueueFull = errors.New("ingest pipeline at capacity") + +// PipelineConfig holds the tunables for a Pipeline. +type PipelineConfig struct { + Capacity int // total queue depth across all signal types + Workers int // worker goroutines draining the queue + SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0) +} + +// DefaultPipelineConfig returns production-sized defaults. +func DefaultPipelineConfig() PipelineConfig { + return PipelineConfig{ + Capacity: 50000, + Workers: 8, + SoftThreshold: 0.9, + } +} + +// pipelineWriter is the slice of *storage.Repository the Pipeline depends +// on. Defining it as an interface keeps the package layering clean and +// lets tests inject fakes without spinning up SQLite. +type pipelineWriter interface { + BatchCreateTraces(traces []storage.Trace) error + BatchCreateSpans(spans []storage.Span) error + BatchCreateLogs(logs []storage.Log) error +} + +// Pipeline decouples OTLP Export() from synchronous DB writes. It holds a +// bounded buffered channel of Batches, a worker pool that drains the +// channel into the Repository, and Prometheus instruments that surface +// queue depth, drop counts, and rejection counts. +// +// Lifecycle: +// +// p := NewPipeline(repo, metrics, cfg) +// p.Start(ctx) +// defer p.Stop() // drains in-flight before returning +// p.Submit(batch) +type Pipeline struct { + writer pipelineWriter + metrics *telemetry.Metrics + + cfg PipelineConfig + queue chan *Batch + + // Stats — exported via accessors for tests and for the /metrics path + // that doesn't already cover pipeline counters. + enqueuedTotal atomic.Int64 + processedTotal atomic.Int64 + droppedHealthy atomic.Int64 + rejectedFull atomic.Int64 + processFailures atomic.Int64 + + stopCh chan struct{} + once sync.Once + wg sync.WaitGroup +} + +// NewPipeline constructs a Pipeline with the given config, falling back +// to DefaultPipelineConfig() values for non-positive fields. The +// Pipeline does NOT start workers — call Start(ctx) when ready. +func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg PipelineConfig) *Pipeline { + d := DefaultPipelineConfig() + if cfg.Capacity <= 0 { + cfg.Capacity = d.Capacity + } + if cfg.Workers <= 0 { + cfg.Workers = d.Workers + } + if cfg.SoftThreshold <= 0 || cfg.SoftThreshold >= 1.0 { + cfg.SoftThreshold = d.SoftThreshold + } + return &Pipeline{ + writer: writer, + metrics: metrics, + cfg: cfg, + queue: make(chan *Batch, cfg.Capacity), + stopCh: make(chan struct{}), + } +} + +// Start spawns the worker pool. Safe to call once. Subsequent calls are +// no-ops; tests rely on this for reset semantics. +func (p *Pipeline) Start(ctx context.Context) { + for range p.cfg.Workers { + p.wg.Go(func() { + defer func() { + if r := recover(); r != nil { + slog.Error("ingest pipeline worker panic", + "panic", r, + "stack", string(debug.Stack()), + ) + if p.metrics != nil && p.metrics.PanicsRecoveredTotal != nil { + p.metrics.PanicsRecoveredTotal.WithLabelValues("ingest_pipeline").Inc() + } + } + }() + p.worker(ctx) + }) + } +} + +// Submit enqueues a batch for asynchronous persistence. Returns nil when +// the batch is accepted (or silently dropped under soft backpressure) +// and ErrQueueFull when the queue is at hard capacity. Nil batches are +// no-ops. +// +// Soft backpressure: when fullness >= SoftThreshold, healthy batches +// (Priority()==false) are dropped at the door and Submit returns nil so +// the OTLP client sees a successful Export. Errors and slow traces +// always continue to the channel. +// +// Hard backpressure: when the channel send fails (buffer at 100%), +// Submit returns ErrQueueFull regardless of priority. The caller should +// translate this into a backpressure signal so the client retries with +// exponential backoff rather than tighter loops. +func (p *Pipeline) Submit(b *Batch) error { + if b == nil { + return nil + } + if len(b.Traces) == 0 && len(b.Spans) == 0 && len(b.Logs) == 0 { + // Empty batch — nothing to persist. Skip the channel entirely. + return nil + } + b.enqueuedAt = time.Now() + + fullness := float64(len(p.queue)) / float64(p.cfg.Capacity) + if fullness >= p.cfg.SoftThreshold && !b.Priority() { + p.droppedHealthy.Add(1) + p.observeDrop(b.Type, "soft_backpressure") + return nil + } + + select { + case p.queue <- b: + p.enqueuedTotal.Add(1) + p.observeQueueDepth(b.Type) + return nil + default: + p.rejectedFull.Add(1) + p.observeDrop(b.Type, "queue_full") + return ErrQueueFull + } +} + +// Stop signals workers to exit and blocks until in-flight batches have +// been drained from the channel. Idempotent. +func (p *Pipeline) Stop() { + p.once.Do(func() { + close(p.stopCh) + }) + p.wg.Wait() +} + +// Stats returns snapshot counters for tests and for telemetry that +// doesn't already use Prometheus instruments. The values are best-effort +// and not synchronized across atomics — sufficient for diagnostics. +func (p *Pipeline) Stats() PipelineStats { + return PipelineStats{ + Enqueued: p.enqueuedTotal.Load(), + Processed: p.processedTotal.Load(), + DroppedHealthy: p.droppedHealthy.Load(), + RejectedFull: p.rejectedFull.Load(), + ProcessFailures: p.processFailures.Load(), + QueueDepth: len(p.queue), + Capacity: p.cfg.Capacity, + } +} + +// PipelineStats is a snapshot of pipeline counters. +type PipelineStats struct { + Enqueued int64 + Processed int64 + DroppedHealthy int64 + RejectedFull int64 + ProcessFailures int64 + QueueDepth int + Capacity int +} + +// worker drains the queue. Exits when stopCh closes (after draining +// remaining batches) or when ctx is canceled (immediate). +func (p *Pipeline) worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case b := <-p.queue: + p.process(b) + case <-p.stopCh: + // Drain remaining buffered batches synchronously so a + // graceful shutdown doesn't lose in-flight ingest. + for { + select { + case b := <-p.queue: + p.process(b) + default: + return + } + } + } + } +} + +// process persists a single batch in Trace→Span→Log order, mirroring the +// ordering invariant of the synchronous Export() path. Failures are +// logged and surfaced via processFailures; the batch is then dropped +// (the DLQ tier is the redundancy story for write failures). +func (p *Pipeline) process(b *Batch) { + if b == nil { + return + } + defer func() { + if r := recover(); r != nil { + slog.Error("ingest pipeline process panic", + "panic", r, + "stack", string(debug.Stack()), + ) + p.processFailures.Add(1) + if p.metrics != nil && p.metrics.PanicsRecoveredTotal != nil { + p.metrics.PanicsRecoveredTotal.WithLabelValues("ingest_pipeline").Inc() + } + } + }() + p.processedTotal.Add(1) + + if len(b.Traces) > 0 { + if err := p.writer.BatchCreateTraces(b.Traces); err != nil { + slog.Error("ingest pipeline: BatchCreateTraces failed", "error", err) + p.processFailures.Add(1) + // Continue — spans may still land if their trace exists from + // a prior batch. Mirrors the synchronous path's tolerance. + } + } + if len(b.Spans) > 0 { + if err := p.writer.BatchCreateSpans(b.Spans); err != nil { + slog.Error("ingest pipeline: BatchCreateSpans failed", "error", err) + p.processFailures.Add(1) + return + } + if b.SpanCallback != nil { + for _, s := range b.Spans { + b.SpanCallback(s) + } + } + } + if len(b.Logs) > 0 { + if err := p.writer.BatchCreateLogs(b.Logs); err != nil { + slog.Error("ingest pipeline: BatchCreateLogs failed", "error", err) + p.processFailures.Add(1) + return + } + if b.LogCallback != nil { + for _, l := range b.Logs { + b.LogCallback(l) + } + } + } +} + +func (p *Pipeline) observeQueueDepth(t SignalType) { + if p.metrics == nil || p.metrics.IngestPipelineQueueDepth == nil { + return + } + p.metrics.IngestPipelineQueueDepth.WithLabelValues(signalLabel(t)).Set(float64(len(p.queue))) +} + +func (p *Pipeline) observeDrop(t SignalType, reason string) { + if p.metrics == nil || p.metrics.IngestPipelineDroppedTotal == nil { + return + } + p.metrics.IngestPipelineDroppedTotal.WithLabelValues(signalLabel(t), reason).Inc() +} diff --git a/internal/ingest/pipeline_e2e_test.go b/internal/ingest/pipeline_e2e_test.go new file mode 100644 index 0000000..f8396d7 --- /dev/null +++ b/internal/ingest/pipeline_e2e_test.go @@ -0,0 +1,172 @@ +package ingest + +import ( + "context" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/config" + "github.com/RandomCodeSpace/otelcontext/internal/storage" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// pipelineHarness wires a real TraceServer + LogsServer to an in-memory +// SQLite repository through the async Pipeline. Each test gets its own +// harness so they don't share queue state. +type pipelineHarness struct { + repo *storage.Repository + traces *TraceServer + logs *LogsServer + pipeline *Pipeline +} + +func newPipelineHarness(t *testing.T, cap, workers int) *pipelineHarness { + t.Helper() + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + + cfg := &config.Config{ + IngestMinSeverity: "DEBUG", + SamplingLatencyThresholdMs: 500, + } + traces := NewTraceServer(repo, nil, cfg) + logs := NewLogsServer(repo, nil, cfg) + + pl := NewPipeline(repo, nil, PipelineConfig{Capacity: cap, Workers: workers, SoftThreshold: 0.9}) + if workers > 0 { + pl.Start(context.Background()) + } + traces.SetPipeline(pl) + logs.SetPipeline(pl) + + t.Cleanup(func() { + pl.Stop() + _ = repo.Close() + }) + return &pipelineHarness{repo: repo, traces: traces, logs: logs, pipeline: pl} +} + +// TestPipelineE2E_TracesPersistThroughPipeline verifies that a trace +// Export call submitted via the pipeline lands in the DB once workers +// drain the queue. Confirms the Trace→Span→Log insert path holds end- +// to-end through async persistence. +func TestPipelineE2E_TracesPersistThroughPipeline(t *testing.T) { + h := newPipelineHarness(t, 16, 2) + req := buildTracesRequest("svc-a", 5) + + if _, err := h.traces.Export(context.Background(), req); err != nil { + t.Fatalf("Export: %v", err) + } + + if !waitFor(t, 3*time.Second, func() bool { + return countSpans(t, h.repo) >= 5 + }) { + t.Fatalf("spans did not land in DB through pipeline within deadline (got %d, want >=5)", countSpans(t, h.repo)) + } +} + +// TestPipelineE2E_LogsPersistThroughPipeline same as above for logs. +func TestPipelineE2E_LogsPersistThroughPipeline(t *testing.T) { + h := newPipelineHarness(t, 16, 2) + req := buildLogsRequest("svc-a", 7) + + if _, err := h.logs.Export(context.Background(), req); err != nil { + t.Fatalf("Export: %v", err) + } + + if !waitFor(t, 3*time.Second, func() bool { + got, err := h.repo.GetRecentLogs(context.Background(), 100) + if err != nil { + t.Fatalf("GetRecentLogs: %v", err) + } + return countByService(got, "svc-a") >= 7 + }) { + t.Fatalf("logs did not land in DB through pipeline within deadline") + } +} + +// TestPipelineE2E_HardCapacityReturnsResourceExhausted validates the +// gRPC error-code mapping. With workers=0 and cap=1 priority traffic, +// the second Export must surface RESOURCE_EXHAUSTED so OTLP clients +// back off rather than retry tighter. +func TestPipelineE2E_HardCapacityReturnsResourceExhausted(t *testing.T) { + // workers=0 → nothing drains → after 1 priority submit, queue is full. + h := newPipelineHarness(t, 1, 0) + + // First Export fills the queue. Build with an error span so it + // bypasses soft backpressure (which kicks in at >=90% but a 1-cap + // queue is degenerate — any non-empty submit is at 100%). + primer := buildTracesRequest("svc-a", 1) + primer.ResourceSpans[0].ScopeSpans[0].Spans[0].Status = errorStatusForTest() + if _, err := h.traces.Export(context.Background(), primer); err != nil { + t.Fatalf("primer Export: %v", err) + } + if got := h.pipeline.Stats().Enqueued; got != 1 { + t.Fatalf("primer not enqueued (got %d, want 1)", got) + } + + // Second Export overflows. Use a priority batch so it can't be + // silently dropped by soft backpressure either. + overflow := buildTracesRequest("svc-a", 1) + overflow.ResourceSpans[0].ScopeSpans[0].Spans[0].Status = errorStatusForTest() + _, err := h.traces.Export(context.Background(), overflow) + if err == nil { + t.Fatalf("Export at hard capacity returned nil error, want RESOURCE_EXHAUSTED") + } + st, ok := status.FromError(err) + if !ok { + t.Fatalf("Export error %v is not a grpc status error", err) + } + if st.Code() != codes.ResourceExhausted { + t.Fatalf("Export error code=%s, want %s", st.Code(), codes.ResourceExhausted) + } + if h.pipeline.Stats().RejectedFull == 0 { + t.Fatalf("RejectedFull counter did not increment after hard-capacity reject") + } +} + +// TestPipelineE2E_PriorityBatchProtected verifies that under sustained +// soft-backpressure conditions, a batch flagged as priority (error span +// or slow span) is never silently dropped — it either enqueues or +// receives an explicit hard-capacity rejection. +func TestPipelineE2E_PriorityBatchProtected(t *testing.T) { + // cap=10 workers=0 → fill to soft threshold first, then submit + // healthy (drop) and priority (must still enqueue) batches. + h := newPipelineHarness(t, 10, 0) + for range 9 { + _ = h.pipeline.Submit(healthyBatch()) + } + + // Healthy at >=90% should drop silently. + healthyReq := buildTracesRequest("svc-h", 1) + if _, err := h.traces.Export(context.Background(), healthyReq); err != nil { + t.Fatalf("healthy Export at soft threshold returned %v, want nil", err) + } + if h.pipeline.Stats().DroppedHealthy < 1 { + t.Fatalf("healthy batch was not soft-dropped — DroppedHealthy=%d", h.pipeline.Stats().DroppedHealthy) + } + + // Priority must enqueue (occupying the 10th slot). + prioReq := buildTracesRequest("svc-p", 1) + prioReq.ResourceSpans[0].ScopeSpans[0].Spans[0].Status = errorStatusForTest() + if _, err := h.traces.Export(context.Background(), prioReq); err != nil { + t.Fatalf("priority Export at soft threshold returned %v, want nil", err) + } + if got := h.pipeline.Stats().Enqueued; got != 10 { + t.Fatalf("priority batch not enqueued (Enqueued=%d, want 10)", got) + } +} + +// errorStatusForTest is a small helper to flag a span as ERROR for +// priority-batch tests. +func errorStatusForTest() *tracepb.Status { + return &tracepb.Status{Code: tracepb.Status_STATUS_CODE_ERROR} +} diff --git a/internal/ingest/pipeline_test.go b/internal/ingest/pipeline_test.go new file mode 100644 index 0000000..7e95450 --- /dev/null +++ b/internal/ingest/pipeline_test.go @@ -0,0 +1,422 @@ +package ingest + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" +) + +// fakeWriter is a deterministic, in-memory pipelineWriter for tests. +// Records every call in order and supports configurable failure modes +// without needing SQLite. +type fakeWriter struct { + mu sync.Mutex + + tracesCalls [][]storage.Trace + spansCalls [][]storage.Span + logsCalls [][]storage.Log + order []string // sequence of "traces"/"spans"/"logs" tags + + // Optional failure injectors. When set, the corresponding BatchCreate* + // returns the configured error on its next call. + traceErr error + spanErr error + logErr error + + // When >0, BatchCreateSpans blocks for this duration before returning. + // Used to keep batches "in flight" while we observe queue depth. + spanDelay time.Duration +} + +func (f *fakeWriter) BatchCreateTraces(t []storage.Trace) error { + f.mu.Lock() + defer f.mu.Unlock() + f.tracesCalls = append(f.tracesCalls, t) + f.order = append(f.order, "traces") + return f.traceErr +} + +func (f *fakeWriter) BatchCreateSpans(s []storage.Span) error { + if f.spanDelay > 0 { + time.Sleep(f.spanDelay) + } + f.mu.Lock() + defer f.mu.Unlock() + f.spansCalls = append(f.spansCalls, s) + f.order = append(f.order, "spans") + return f.spanErr +} + +func (f *fakeWriter) BatchCreateLogs(l []storage.Log) error { + f.mu.Lock() + defer f.mu.Unlock() + f.logsCalls = append(f.logsCalls, l) + f.order = append(f.order, "logs") + return f.logErr +} + +func (f *fakeWriter) snapshotOrder() []string { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]string, len(f.order)) + copy(out, f.order) + return out +} + +// healthyBatch builds a Batch with one each of trace/span/log and no +// priority flags — eligible for soft-backpressure drops. +func healthyBatch() *Batch { + return &Batch{ + Type: SignalTraces, + Tenant: "t1", + Traces: []storage.Trace{{TraceID: "trace-1", ServiceName: "svc"}}, + Spans: []storage.Span{{TraceID: "trace-1", SpanID: "span-1", ServiceName: "svc"}}, + Logs: []storage.Log{{TraceID: "trace-1", Body: "ok"}}, + } +} + +// errorBatch is identical to healthyBatch but flagged HasError, so soft +// backpressure must let it through. +func errorBatch() *Batch { + b := healthyBatch() + b.HasError = true + return b +} + +// waitFor polls until pred() returns true or the deadline elapses. +// Returns false on timeout. Used to bridge async submit→worker latency. +func waitFor(t *testing.T, timeout time.Duration, pred func() bool) bool { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if pred() { + return true + } + time.Sleep(2 * time.Millisecond) + } + return pred() +} + +// ===== Submission semantics ===== + +func TestPipeline_NilBatch_NoOp(t *testing.T) { + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 1}) + if err := p.Submit(nil); err != nil { + t.Fatalf("Submit(nil) returned %v, want nil", err) + } + if got := p.Stats().Enqueued; got != 0 { + t.Fatalf("Submit(nil) enqueued %d batches, want 0", got) + } +} + +func TestPipeline_EmptyBatch_NoOp(t *testing.T) { + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 1}) + empty := &Batch{Type: SignalTraces, Tenant: "t"} + if err := p.Submit(empty); err != nil { + t.Fatalf("Submit(empty) returned %v, want nil", err) + } + if got := p.Stats().Enqueued; got != 0 { + t.Fatalf("empty batch enqueued %d, want 0 — should skip channel entirely", got) + } +} + +func TestPipeline_AcceptsBelowSoftThreshold(t *testing.T) { + // Capacity 10, soft threshold 0.9 → first 9 healthy submits go through. + // Workers=0 means nothing drains; depth grows monotonically. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 0, SoftThreshold: 0.9}) + for range 9 { + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("submit below soft threshold returned %v, want nil", err) + } + } + stats := p.Stats() + if stats.Enqueued != 9 || stats.DroppedHealthy != 0 { + t.Fatalf("below soft threshold: enqueued=%d dropped=%d, want 9/0", stats.Enqueued, stats.DroppedHealthy) + } +} + +func TestPipeline_DropsHealthyAtSoftThreshold(t *testing.T) { + // Capacity 10, soft threshold 0.9. Fill to 9 (below), then submit + // healthy → should drop. Verify counter and that the queue depth + // stayed at 9. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 0, SoftThreshold: 0.9}) + for range 9 { + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("priming submit failed: %v", err) + } + } + // Now at exactly 9/10 = 0.9 fullness — soft backpressure engages. + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("dropped submit returned err %v, want nil (silent drop)", err) + } + stats := p.Stats() + if stats.Enqueued != 9 { + t.Fatalf("enqueued=%d after soft-drop, want 9 (drop should not enqueue)", stats.Enqueued) + } + if stats.DroppedHealthy != 1 { + t.Fatalf("DroppedHealthy=%d, want 1", stats.DroppedHealthy) + } +} + +func TestPipeline_PriorityBatchesBypassSoftBackpressure(t *testing.T) { + // Same setup as drop test, but submit an error-flagged batch — must + // enqueue, not drop, because errors are diagnostic-critical. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 0, SoftThreshold: 0.9}) + for range 9 { + _ = p.Submit(healthyBatch()) + } + if err := p.Submit(errorBatch()); err != nil { + t.Fatalf("priority submit returned %v, want nil (errors must pass soft backpressure)", err) + } + stats := p.Stats() + if stats.Enqueued != 10 { + t.Fatalf("priority batch enqueued=%d, want 10", stats.Enqueued) + } + if stats.DroppedHealthy != 0 { + t.Fatalf("DroppedHealthy=%d, want 0 (priority must not be dropped)", stats.DroppedHealthy) + } +} + +func TestPipeline_RejectsAtHardCapacity(t *testing.T) { + // Fill the queue to 100% with priority traffic (bypasses soft drop), + // then submit one more priority batch — must return ErrQueueFull. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 5, Workers: 0, SoftThreshold: 0.9}) + for range 5 { + if err := p.Submit(errorBatch()); err != nil { + t.Fatalf("priming priority submit failed: %v", err) + } + } + err := p.Submit(errorBatch()) + if !errors.Is(err, ErrQueueFull) { + t.Fatalf("hard-capacity submit returned %v, want ErrQueueFull", err) + } + stats := p.Stats() + if stats.RejectedFull != 1 { + t.Fatalf("RejectedFull=%d, want 1", stats.RejectedFull) + } +} + +// ===== Worker / processing semantics ===== + +func TestPipeline_PreservesInsertionOrder(t *testing.T) { + // Trace → Span → Log ordering must hold across processing because of + // the FK constraint enforced by the synchronous path. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 4, Workers: 1}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("submit: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 1 }) { + t.Fatalf("worker did not process batch within deadline") + } + + got := w.snapshotOrder() + want := []string{"traces", "spans", "logs"} + if len(got) != len(want) { + t.Fatalf("call order length=%d, want %d (%v)", len(got), len(want), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("call order[%d]=%q, want %q (full: %v)", i, got[i], want[i], got) + } + } +} + +func TestPipeline_CallbacksFireAfterPersistence(t *testing.T) { + // Callbacks must run AFTER the corresponding BatchCreate* succeeds. + // On failure, callbacks must NOT run for that signal type. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + var spanHits, logHits atomic.Int64 + b := healthyBatch() + b.SpanCallback = func(_ storage.Span) { spanHits.Add(1) } + b.LogCallback = func(_ storage.Log) { logHits.Add(1) } + + if err := p.Submit(b); err != nil { + t.Fatalf("submit: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { return spanHits.Load() == 1 && logHits.Load() == 1 }) { + t.Fatalf("callbacks did not fire (span=%d log=%d, want 1/1)", spanHits.Load(), logHits.Load()) + } +} + +func TestPipeline_FailedSpansSkipsLogs(t *testing.T) { + // When BatchCreateSpans fails, BatchCreateLogs must NOT run for that + // batch — preserves the invariant that orphan logs aren't persisted + // without their span. Mirrors the synchronous path's behavior of + // returning the span error before log insert. + w := &fakeWriter{spanErr: errors.New("span db down")} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("submit: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().ProcessFailures > 0 }) { + t.Fatalf("expected ProcessFailures > 0, got %d", p.Stats().ProcessFailures) + } + calls := w.snapshotOrder() + for _, c := range calls { + if c == "logs" { + t.Fatalf("BatchCreateLogs ran after spans failed — order=%v", calls) + } + } +} + +func TestPipeline_FailedTracesContinuesToSpans(t *testing.T) { + // Trace failures are tolerated — spans may still land if the trace + // row exists from a prior batch. Must NOT short-circuit subsequent + // span/log persistence. + w := &fakeWriter{traceErr: errors.New("transient")} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + if err := p.Submit(healthyBatch()); err != nil { + t.Fatalf("submit: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { + ord := w.snapshotOrder() + return len(ord) == 3 && ord[1] == "spans" && ord[2] == "logs" + }) { + t.Fatalf("trace failure should not stop spans/logs — order=%v", w.snapshotOrder()) + } +} + +func TestPipeline_DrainsOnStop(t *testing.T) { + // Stop() must process remaining buffered batches before returning so + // graceful shutdown doesn't lose in-flight ingest. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 50, Workers: 2}) + for range 20 { + _ = p.Submit(healthyBatch()) + } + // Start AFTER submitting so the queue is pre-loaded — exercises the + // drain path in worker(). + ctx := context.Background() + p.Start(ctx) + p.Stop() + + if got := p.Stats().Processed; got != 20 { + t.Fatalf("after Stop: processed=%d, want 20 (drain path failed)", got) + } +} + +func TestPipeline_StopIsIdempotent(t *testing.T) { + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 4, Workers: 1}) + p.Start(context.Background()) + // First Stop drains. Second must not panic on closed channel. + p.Stop() + p.Stop() +} + +func TestPipeline_ConcurrentSubmit(t *testing.T) { + // 100 goroutines × 50 submits = 5000 submits. Pipeline must not + // race; total of (enqueued + dropped + rejected) must equal 5000. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 1024, Workers: 4}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + var wg sync.WaitGroup + for range 100 { + wg.Go(func() { + for range 50 { + _ = p.Submit(healthyBatch()) + } + }) + } + wg.Wait() + + // Wait for queue to drain so Processed catches up with Enqueued. + if !waitFor(t, 5*time.Second, func() bool { + s := p.Stats() + return s.Processed == s.Enqueued && s.QueueDepth == 0 + }) { + t.Fatalf("queue did not drain — stats=%+v", p.Stats()) + } + + stats := p.Stats() + total := stats.Enqueued + stats.DroppedHealthy + stats.RejectedFull + if total != 5000 { + t.Fatalf("submits accounted=%d, want 5000 — race lost batches; stats=%+v", total, stats) + } +} + +func TestPipeline_DefaultsApplied(t *testing.T) { + // Zero-value config must fall back to DefaultPipelineConfig(). + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{}) + d := DefaultPipelineConfig() + if p.cfg.Capacity != d.Capacity { + t.Errorf("Capacity default not applied: got %d want %d", p.cfg.Capacity, d.Capacity) + } + if p.cfg.Workers != d.Workers { + t.Errorf("Workers default not applied: got %d want %d", p.cfg.Workers, d.Workers) + } + if p.cfg.SoftThreshold != d.SoftThreshold { + t.Errorf("SoftThreshold default not applied: got %v want %v", p.cfg.SoftThreshold, d.SoftThreshold) + } +} + +func TestPipeline_HardCapacityEvenForPriority(t *testing.T) { + // Above hard capacity, priority batches are still rejected. The + // caller is responsible for translating into RESOURCE_EXHAUSTED so + // the OTLP client retries; better than silently losing errors. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 2, Workers: 0, SoftThreshold: 0.9}) + _ = p.Submit(errorBatch()) + _ = p.Submit(errorBatch()) + err := p.Submit(errorBatch()) + if !errors.Is(err, ErrQueueFull) { + t.Fatalf("hard cap with priority: got %v, want ErrQueueFull", err) + } +} + +func TestPipeline_PanicInCallbackRecovered(t *testing.T) { + // A panicking callback must not kill the worker; processFailures + // goes up but other batches still process. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 4, Workers: 1}) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + bad := healthyBatch() + bad.SpanCallback = func(_ storage.Span) { panic("boom") } + good := healthyBatch() + + if err := p.Submit(bad); err != nil { + t.Fatalf("submit bad: %v", err) + } + if err := p.Submit(good); err != nil { + t.Fatalf("submit good: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed >= 2 }) { + t.Fatalf("worker did not survive callback panic — Processed=%d", p.Stats().Processed) + } + if p.Stats().ProcessFailures == 0 { + t.Errorf("expected ProcessFailures > 0 after callback panic") + } +} diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 4c183ef..ffabb39 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -78,6 +78,15 @@ type Metrics struct { // --- GraphRAG overflow --- GraphRAGEventsDroppedTotal *prometheus.CounterVec + // --- Async ingest pipeline (Phase 1 robustness work) --- + // IngestPipelineQueueDepth — current queue depth, sampled on every Submit. + // Labeled by signal so spikes can be attributed to traces vs logs. + IngestPipelineQueueDepth *prometheus.GaugeVec + // IngestPipelineDroppedTotal — batches that did NOT reach the DB. + // reason="soft_backpressure" — healthy batch dropped at >=90% fullness. + // reason="queue_full" — batch rejected at 100% capacity (client got 429/RESOURCE_EXHAUSTED). + IngestPipelineDroppedTotal *prometheus.CounterVec + // --- DB pool (sampled every 5s from sql.DB.Stats) --- DBPoolOpenConnections prometheus.Gauge DBPoolInUse prometheus.Gauge @@ -269,6 +278,15 @@ func New() *Metrics { Help: "Events dropped because the GraphRAG event channel was full.", }, []string{"signal"}), + IngestPipelineQueueDepth: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "otelcontext_ingest_pipeline_queue_depth", + Help: "Current depth of the async ingest pipeline queue, by signal type.", + }, []string{"signal"}), + IngestPipelineDroppedTotal: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "otelcontext_ingest_pipeline_dropped_total", + Help: "Batches dropped by the async ingest pipeline. reason=soft_backpressure (>=90% queue, healthy) or queue_full (100% queue, rejected to client).", + }, []string{"signal", "reason"}), + // DB pool (Task 7 — visibility for DB_MAX_OPEN_CONNS sizing). DBPoolOpenConnections: promauto.NewGauge(prometheus.GaugeOpts{ Name: "otelcontext_db_pool_open_connections", diff --git a/main.go b/main.go index bafd4dc..70eb067 100644 --- a/main.go +++ b/main.go @@ -397,6 +397,27 @@ func main() { ) } + // Wire async ingest pipeline. Decouples OTLP Export() from synchronous + // DB writes — caller returns as soon as the parsed batch is enqueued. + // When disabled (INGEST_ASYNC_ENABLED=false), trace/logs servers fall + // back to the inline-write path bit-for-bit. + var ingestPipeline *ingest.Pipeline + if cfg.IngestAsyncEnabled { + ingestPipeline = ingest.NewPipeline(repo, metrics, ingest.PipelineConfig{ + Capacity: cfg.IngestPipelineQueueSize, + Workers: cfg.IngestPipelineWorkers, + }) + ingestPipeline.Start(context.Background()) + traceServer.SetPipeline(ingestPipeline) + logsServer.SetPipeline(ingestPipeline) + slog.Info("🌊 Async ingest pipeline enabled", + "queue_size", cfg.IngestPipelineQueueSize, + "workers", cfg.IngestPipelineWorkers, + ) + } else { + slog.Warn("🐌 Async ingest pipeline disabled (INGEST_ASYNC_ENABLED=false) — Export() blocks on DB writes") + } + // Wire up live log streaming + AI + DLQ metrics logHandler := func(l storage.Log) { start := time.Now() @@ -749,6 +770,13 @@ func main() { graphRAG.Stop() cancelGraphRAG() + // 3a. Drain async ingest pipeline. gRPC GracefulStop above guarantees + // no new Submits land; this blocks until workers finish in-flight + // batches so a graceful shutdown doesn't lose buffered ingest. + if ingestPipeline != nil { + ingestPipeline.Stop() + } + // 4. Stop DLQ (may still be replaying) dlq.Stop()