-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
1030 lines (938 loc) · 34.7 KB
/
main.go
File metadata and controls
1030 lines (938 loc) · 34.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/RandomCodeSpace/otelcontext/internal/ai"
"github.com/RandomCodeSpace/otelcontext/internal/api"
"github.com/RandomCodeSpace/otelcontext/internal/config"
"github.com/RandomCodeSpace/otelcontext/internal/graph"
"github.com/RandomCodeSpace/otelcontext/internal/graphrag"
"github.com/RandomCodeSpace/otelcontext/internal/ingest"
"github.com/RandomCodeSpace/otelcontext/internal/mcp"
"github.com/RandomCodeSpace/otelcontext/internal/queue"
"github.com/RandomCodeSpace/otelcontext/internal/realtime"
"github.com/RandomCodeSpace/otelcontext/internal/storage"
"github.com/RandomCodeSpace/otelcontext/internal/telemetry"
tlsbootstrap "github.com/RandomCodeSpace/otelcontext/internal/tls"
"github.com/RandomCodeSpace/otelcontext/internal/tsdb"
"github.com/RandomCodeSpace/otelcontext/internal/ui"
"runtime/debug"
"sync"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip" // Register gzip decompressor
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
// Version is detected from build info at startup.
// Returns the real tag when installed via `go install`, "local" otherwise.
var Version = detectVersion()
// detectVersion reads runtime/debug.BuildInfo to return the module version
// that go install or go build stamped into the binary. Falls back to "local"
// for go run, raw go build, or any path that does not produce a stamped
// build (e.g. `(devel)` from module-aware development builds).
func detectVersion() string {
if info, ok := debug.ReadBuildInfo(); ok {
if v := info.Main.Version; v != "" && v != "(devel)" {
return v
}
}
return "local"
}
// cleanupStack is an ordered LIFO list of cleanup closures registered during
// startup. fatal() walks it before os.Exit so DBs, DLQs, and tracer providers
// get a chance to flush even on a fatal error. Each fn should be non-blocking
// or have its own bounded timeout.
var (
cleanupMu sync.Mutex
cleanupStack []func()
)
// RegisterCleanup pushes a cleanup closure onto the LIFO stack. Exported so
// future startup helpers outside main can enroll resources; the stack is
// walked by fatal() on failed boot.
func RegisterCleanup(fn func()) {
cleanupMu.Lock()
cleanupStack = append(cleanupStack, fn)
cleanupMu.Unlock()
}
// runCleanups pops and invokes cleanup closures in LIFO order.
func runCleanups() {
cleanupMu.Lock()
fns := cleanupStack
cleanupStack = nil
cleanupMu.Unlock()
for i := len(fns) - 1; i >= 0; i-- {
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("cleanup panic", "panic", r)
}
}()
fns[i]()
}()
}
}
// fatal replaces scattered log.Fatalf calls. It emits a structured error,
// runs any registered cleanups in LIFO order, and exits 1. Extra key/value
// pairs are passed straight through to slog.Error.
func fatal(msg string, err error, kv ...any) {
args := append([]any{slog.Any("error", err)}, kv...)
slog.Error(msg, args...)
runCleanups()
os.Exit(1)
}
func main() {
versionFlag := flag.Bool("version", false, "print version and exit")
flag.Parse()
if *versionFlag {
fmt.Printf("OtelContext version %s\n", Version)
os.Exit(0)
}
// Force UTC timezone globally — prevents system timezone leaking into timestamps
time.Local = time.UTC
printBanner()
// Top-level application context used by boot-time background goroutines
// (e.g. vector-index hydrator) so they can be cancelled before the DB closes.
appCtx, appCancel := context.WithCancel(context.Background())
defer appCancel()
// WaitGroup for boot-time goroutines whose completion must be awaited
// during shutdown (vector index hydrator, DB health poller).
var bootWG sync.WaitGroup
// 0. Load Configuration
cfg, err := config.Load("")
if err != nil {
fatal("failed to load configuration", err)
}
if err := cfg.Validate(); err != nil {
fatal("invalid configuration", err)
}
// Auto-exclude own service when self-instrumentation points to a loopback
// address (otherwise every span emitted re-enters Export and amplifies).
cfg.GuardSelfInstrumentation()
if err := cfg.ValidateDBForEnv(); err != nil {
fatal("DB/Env validation", err)
}
if strings.EqualFold(cfg.DBDriver, "sqlite") {
slog.Warn("SQLite driver in use. Auto-tuned defaults survive ~50-120 services " +
"on a 4 GB host with 7-day retention. Switch to Postgres beyond that band, " +
"or for sustained >50 writes/sec. See README 'Production sizing'.")
}
// Initialize structured logger
var level slog.Level
switch strings.ToUpper(cfg.LogLevel) {
case "DEBUG":
level = slog.LevelDebug
case "WARN":
level = slog.LevelWarn
case "ERROR":
level = slog.LevelError
default:
level = slog.LevelInfo
}
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
}))
slog.SetDefault(logger)
slog.Info("🚀 Starting OtelContext", "version", Version, "env", cfg.Env, "log_level", level)
// 1. Initialize Internal Telemetry (first — everything registers metrics against this)
metrics := telemetry.New()
slog.Info("📊 Internal telemetry initialized")
// 1b. Initialize OTel self-instrumentation (optional)
var shutdownTracer func(context.Context) error
if cfg.OTelExporterEndpoint != "" {
tp, err := initTracerProvider(cfg.OTelExporterEndpoint)
if err != nil {
slog.Error("Failed to initialize OTel tracer provider", "error", err, "endpoint", cfg.OTelExporterEndpoint)
} else {
otel.SetTracerProvider(tp)
shutdownTracer = tp.Shutdown
slog.Info("🔭 OTel self-instrumentation enabled", "endpoint", cfg.OTelExporterEndpoint)
}
}
// 2. Initialize Storage
repo, err := storage.NewRepository(metrics)
if err != nil {
fatal("Failed to initialize repository", err)
}
slog.Info("💾 Storage initialized", "driver", cfg.DBDriver)
// 2a. Retention scheduler: hourly batched purge + daily VACUUM/ANALYZE.
ctxRetention, cancelRetention := context.WithCancel(context.Background())
retention := storage.NewRetentionScheduler(
repo,
cfg.HotRetentionDays,
cfg.RetentionBatchSize,
time.Duration(cfg.RetentionBatchSleepMs)*time.Millisecond,
)
retention.Start(ctxRetention)
slog.Info("🧹 Retention scheduler started", "retention_days", cfg.HotRetentionDays)
// 2b. Partition scheduler: only when DB_POSTGRES_PARTITIONING=daily.
// Maintains lookahead daily partitions and drops expired ones — DROP
// PARTITION is orders of magnitude faster than DELETE for retention.
var partitionScheduler *storage.PartitionScheduler
var cancelPartitions context.CancelFunc = func() {}
if cfg.DBPostgresPartitioning == storage.PartitioningModeDaily {
ctxPart, cancelPart := context.WithCancel(context.Background())
partitionScheduler = storage.NewPartitionScheduler(repo, cfg.HotRetentionDays, cfg.DBPartitionLookaheadDays)
if metrics != nil {
partitionScheduler.SetMetrics(
func(n int) {
if metrics.PartitionsDropped != nil {
metrics.PartitionsDropped.Add(float64(n))
}
},
func(n int) {
if metrics.PartitionsActive != nil {
metrics.PartitionsActive.Set(float64(n))
}
},
)
}
partitionScheduler.Start(ctxPart)
cancelPartitions = cancelPart
slog.Info("📦 Partition scheduler started", "lookahead_days", cfg.DBPartitionLookaheadDays, "retention_days", cfg.HotRetentionDays)
}
// 3. Initialize DLQ (Dead Letter Queue)
replayInterval, err := time.ParseDuration(cfg.DLQReplayInterval)
if err != nil {
replayInterval = 5 * time.Minute
}
dlq, err := queue.NewDLQWithLimits(cfg.DLQPath, replayInterval, func(data []byte) error {
// Replay handler: typed envelope supports logs, spans, traces, and metrics
var envelope struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal(data, &envelope); err != nil {
// Legacy format: try to deserialize as []storage.Log
var logs []storage.Log
if json.Unmarshal(data, &logs) != nil {
return fmt.Errorf("DLQ replay unmarshal failed: %w", err)
}
return repo.BatchCreateLogs(logs)
}
switch envelope.Type {
case "logs":
var logs []storage.Log
if err := json.Unmarshal(envelope.Data, &logs); err != nil {
return fmt.Errorf("DLQ replay logs unmarshal failed: %w", err)
}
return repo.BatchCreateLogs(logs)
case "spans":
var spans []storage.Span
if err := json.Unmarshal(envelope.Data, &spans); err != nil {
return fmt.Errorf("DLQ replay spans unmarshal failed: %w", err)
}
return repo.BatchCreateSpans(spans)
case "traces":
var traces []storage.Trace
if err := json.Unmarshal(envelope.Data, &traces); err != nil {
return fmt.Errorf("DLQ replay traces unmarshal failed: %w", err)
}
return repo.BatchCreateTraces(traces)
case "metrics":
var metrics []storage.MetricBucket
if err := json.Unmarshal(envelope.Data, &metrics); err != nil {
return fmt.Errorf("DLQ replay metrics unmarshal failed: %w", err)
}
return repo.BatchCreateMetrics(metrics)
default:
return fmt.Errorf("DLQ replay: unknown type %q", envelope.Type)
}
}, cfg.DLQMaxFiles, int64(cfg.DLQMaxDiskMB), cfg.DLQMaxRetries)
if err != nil {
fatal("Failed to initialize DLQ", err)
}
dlq.SetMetrics(
func() { metrics.DLQEnqueuedTotal.Inc() },
func() { metrics.DLQReplaySuccess.Inc() },
func() { metrics.DLQReplayFailure.Inc() },
func(b int64) { metrics.DLQDiskBytes.Set(float64(b)) },
)
dlq.SetTelemetryMetrics(metrics)
dlq.SetMaxReplayPerTick(cfg.DLQMaxReplayPerTick)
slog.Info("🔁 DLQ initialized", "path", cfg.DLQPath, "interval", replayInterval,
"max_replay_per_tick", cfg.DLQMaxReplayPerTick)
// 4. Initialize Real-Time WebSocket Hub
hub := realtime.NewHub(func(count int) {
metrics.SetActiveConnections(count)
})
hub.SetDevMode(cfg.DevMode)
hub.SetMaxClients(cfg.WSMaxClients)
hub.SetWSMetrics(
func(msgType string) { metrics.WSMessagesSent.WithLabelValues(msgType).Inc() },
func() { metrics.WSSlowClientsRemoved.Inc() },
)
go hub.Run()
slog.Info("🔌 WebSocket hub started")
// 4b. Initialize Event Notification Hub (for live mode — pushes data snapshots)
eventHub := realtime.NewEventHub(
repo,
metrics.IncrementActiveConns,
metrics.DecrementActiveConns,
)
ctxEvents, cancelEvents := context.WithCancel(context.Background())
go eventHub.Start(ctxEvents, 5*time.Second, 500*time.Millisecond)
slog.Info("⚡ Event notification hub started (5s snapshots, 500ms batches)")
// 4c. Initialize TSDB Aggregator + Ring Buffer
tsdbAgg := tsdb.NewAggregator(repo, 30*time.Second)
if cfg.MetricMaxCardinality > 0 || cfg.MetricMaxCardinalityPerTenant > 0 {
tsdbAgg.SetCardinalityLimit(cfg.MetricMaxCardinality, cfg.MetricMaxCardinalityPerTenant, func(tenantID string) {
// Maintain the legacy unlabeled counter for back-compat dashboards
// AND emit the labeled by-tenant counter for fairness diagnostics.
metrics.TSDBCardinalityOverflow.Inc()
if metrics.TSDBCardinalityOverflowByTenant != nil {
metrics.TSDBCardinalityOverflowByTenant.WithLabelValues(tenantID).Inc()
}
})
slog.Info("📈 TSDB cardinality limits configured",
"global_max", cfg.MetricMaxCardinality,
"per_tenant_max", cfg.MetricMaxCardinalityPerTenant,
)
}
tsdbAgg.SetMetrics(
func() { metrics.TSDBIngestTotal.Inc() },
func() { metrics.TSDBBatchesDropped.Inc() },
)
ringBuf := tsdb.NewRingBuffer(120, 30*time.Second)
tsdbAgg.SetRingBuffer(ringBuf)
slog.Info("📈 TSDB ring buffer attached (120 slots × 30s = 1h retention)")
ctxTSDB, cancelTSDB := context.WithCancel(context.Background())
go tsdbAgg.Start(ctxTSDB)
slog.Info("📈 TSDB Aggregator started (30s window)")
// 4e. Initialize In-Memory Service Graph (rebuilds from spans every 30s)
svcGraph := graph.New(func(since time.Time) ([]graph.SpanRow, error) {
rows, err := repo.GetSpansForGraph(since)
if err != nil {
return nil, err
}
out := make([]graph.SpanRow, len(rows))
for i, r := range rows {
out[i] = graph.SpanRow{
SpanID: r.SpanID,
ParentSpanID: r.ParentSpanID,
ServiceName: r.ServiceName,
OperationName: r.OperationName,
DurationMs: r.DurationMs,
IsError: r.IsError,
Timestamp: r.Timestamp,
}
}
return out, nil
}, 5*time.Minute, 30*time.Second)
ctxGraph, cancelGraph := context.WithCancel(context.Background())
go svcGraph.Start(ctxGraph)
slog.Info("🕸️ In-memory service graph started (5m window, 30s refresh)")
// 4g. Initialize GraphRAG (replaces simple graph for advanced queries)
graphrag.SetPanicMetrics(metrics)
graphRAGCfg := graphrag.DefaultConfig()
graphRAGCfg.WorkerCount = cfg.GraphRAGWorkerCount
graphRAGCfg.ChannelSize = cfg.GraphRAGEventQueueSize
graphRAG := graphrag.New(repo, tsdbAgg, ringBuf, graphRAGCfg)
graphRAG.SetMetrics(metrics)
ctxGraphRAG, cancelGraphRAG := context.WithCancel(context.Background())
go graphRAG.Start(ctxGraphRAG)
slog.Info("GraphRAG started (layered graph with anomaly detection)",
"workers", cfg.GraphRAGWorkerCount,
"event_queue_size", cfg.GraphRAGEventQueueSize,
)
// Auto-migrate GraphRAG models (Investigation, DrainTemplateRow)
if err := graphrag.AutoMigrateGraphRAG(repo.DB()); err != nil {
slog.Error("Failed to migrate GraphRAG models", "error", err)
}
// 5. Initialize AI Service.
// Workers inherit aiCtx so an in-flight LLM call (30s timeout) is
// cancelled the moment shutdown begins — without this, aiService.Stop()
// blocks for up to 30s per in-flight worker waiting on the upstream
// HTTP call to finish.
aiCtx, aiCancel := context.WithCancel(appCtx)
aiService := ai.NewService(repo)
aiService.SetParentContext(aiCtx)
// 6. Initialize API Server
apiServer := api.NewServer(repo, hub, eventHub, metrics)
apiServer.SetGraph(svcGraph)
apiServer.SetGraphRAG(graphRAG)
// 6b. Initialize MCP Server (HTTP Streamable, JSON-RPC 2.0 + SSE)
mcpServer := mcp.New(cfg.DefaultTenant, repo, metrics, svcGraph)
mcpServer.SetGraphRAG(graphRAG)
mcpServer.SetCallLimit(cfg.MCPMaxConcurrent)
mcpServer.SetCallTimeout(time.Duration(cfg.MCPCallTimeoutMs) * time.Millisecond)
mcpServer.SetCacheTTL(time.Duration(cfg.MCPCacheTTLMs) * time.Millisecond)
slog.Info("🤖 MCP server initialized",
"path", cfg.MCPPath,
"enabled", cfg.MCPEnabled,
"default_tenant", cfg.DefaultTenant,
"max_concurrent", cfg.MCPMaxConcurrent,
"call_timeout_ms", cfg.MCPCallTimeoutMs,
"cache_ttl_ms", cfg.MCPCacheTTLMs,
)
// 7. Initialize OTLP Ingestion (gRPC)
traceServer := ingest.NewTraceServer(repo, metrics, cfg)
logsServer := ingest.NewLogsServer(repo, metrics, cfg)
metricsServer := ingest.NewMetricsServer(repo, metrics, tsdbAgg, cfg)
// Wire adaptive sampler (only when rate < 1.0 to avoid unnecessary overhead)
if cfg.SamplingRate > 0 && cfg.SamplingRate < 1.0 {
sampler := ingest.NewSampler(cfg.SamplingRate, cfg.SamplingAlwaysOnErrors, float64(cfg.SamplingLatencyThresholdMs))
traceServer.SetSampler(sampler)
slog.Info("🎯 Adaptive trace sampling enabled",
"rate", cfg.SamplingRate,
"always_errors", cfg.SamplingAlwaysOnErrors,
"latency_threshold_ms", cfg.SamplingLatencyThresholdMs,
)
}
// Wire async ingest pipeline. Decouples OTLP Export() from synchronous
// DB writes — caller returns as soon as the parsed batch is enqueued.
// When disabled (INGEST_ASYNC_ENABLED=false), trace/logs servers fall
// back to the inline-write path bit-for-bit.
var ingestPipeline *ingest.Pipeline
if cfg.IngestAsyncEnabled {
ingestPipeline = ingest.NewPipeline(repo, metrics, ingest.PipelineConfig{
Capacity: cfg.IngestPipelineQueueSize,
Workers: cfg.IngestPipelineWorkers,
})
ingestPipeline.SetPerTenantCap(cfg.IngestPipelinePerTenantCap)
// Second-tier severity gate. Empty STORE_MIN_SEVERITY means "use the
// same threshold as INGEST_MIN_SEVERITY" — i.e. behavior is identical
// to the legacy single-threshold path. Only enable the gate when the
// store threshold is strictly higher than the ingest threshold; equal
// or lower is wasted work since the receiver has already dropped the
// affected logs.
ingestRank := ingest.ParseSeverity(cfg.IngestMinSeverity)
storeRank := ingestRank
if cfg.StoreMinSeverity != "" {
storeRank = ingest.ParseSeverity(cfg.StoreMinSeverity)
}
if storeRank > ingestRank {
ingestPipeline.SetStoreMinSeverity(storeRank)
slog.Info("🪛 Store-severity gate enabled",
"ingest_min", cfg.IngestMinSeverity,
"store_min", cfg.StoreMinSeverity,
"note", "logs below store_min reach in-memory consumers but are not persisted",
)
} else if cfg.StoreMinSeverity != "" && storeRank < ingestRank {
slog.Warn("STORE_MIN_SEVERITY is lower than INGEST_MIN_SEVERITY — has no effect; receiver already filters",
"ingest_min", cfg.IngestMinSeverity,
"store_min", cfg.StoreMinSeverity,
)
}
ingestPipeline.Start(context.Background())
traceServer.SetPipeline(ingestPipeline)
logsServer.SetPipeline(ingestPipeline)
slog.Info("🌊 Async ingest pipeline enabled",
"queue_size", cfg.IngestPipelineQueueSize,
"workers", cfg.IngestPipelineWorkers,
"per_tenant_cap", cfg.IngestPipelinePerTenantCap,
)
} else {
slog.Warn("🐌 Async ingest pipeline disabled (INGEST_ASYNC_ENABLED=false) — Export() blocks on DB writes")
}
// Wire /ready saturation probes. Both probes are nil-tolerant on the
// api server side; we additionally guard against unconfigured caps
// (DLQ unbounded, async pipeline disabled) by returning 0 — i.e.
// "skipped" semantics — rather than dividing by zero.
if dlq != nil && cfg.DLQMaxDiskMB > 0 {
maxBytes := float64(cfg.DLQMaxDiskMB) * 1024 * 1024
apiServer.SetDLQSaturationProbe(func() float64 {
return float64(dlq.DiskBytes()) / maxBytes
})
}
if ingestPipeline != nil {
apiServer.SetPipelineSaturationProbe(func() float64 {
st := ingestPipeline.Stats()
if st.Capacity == 0 {
return 0
}
return float64(st.QueueDepth) / float64(st.Capacity)
})
}
// Wire up live log streaming + AI + DLQ metrics
logHandler := func(l storage.Log) {
start := time.Now()
eventHub.BroadcastLog(realtime.LogEntry{
ID: l.ID,
TraceID: l.TraceID,
SpanID: l.SpanID,
Severity: l.Severity,
Body: l.Body,
ServiceName: l.ServiceName,
AttributesJSON: string(l.AttributesJSON),
AIInsight: string(l.AIInsight),
Timestamp: l.Timestamp,
})
aiService.EnqueueLog(l)
eventHub.NotifyRefresh()
if time.Since(start) > 100*time.Millisecond {
slog.Warn("Slow broadcast/enqueue", "duration", time.Since(start))
}
}
logsServer.SetLogCallback(func(l storage.Log) {
logHandler(l)
graphRAG.OnLogIngested(l)
})
traceServer.SetLogCallback(func(l storage.Log) {
logHandler(l)
graphRAG.OnLogIngested(l)
})
// Wire span callbacks for GraphRAG
traceServer.SetSpanCallback(func(span storage.Span) {
graphRAG.OnSpanIngested(span)
})
metricsServer.SetMetricCallback(func(m tsdb.RawMetric) {
eventHub.BroadcastMetric(realtime.MetricEntry{
Name: m.Name,
ServiceName: m.ServiceName,
Value: m.Value,
Timestamp: m.Timestamp,
Attributes: m.Attributes,
})
graphRAG.OnMetricIngested(m)
})
// Update DLQ size metric periodically. Tied to appCtx so the goroutine
// exits before dlq.Stop() — otherwise it keeps polling Size()/DiskBytes()
// on a stopped DLQ and races with the file-handle close in repo.Close().
bootWG.Add(1)
go func() {
defer bootWG.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-appCtx.Done():
return
case <-ticker.C:
metrics.SetDLQSize(dlq.Size())
metrics.DLQDiskBytes.Set(float64(dlq.DiskBytes()))
}
}
}()
// Resolve TLS material once: explicit cert-file > self-signed > plaintext.
// Both gRPC and HTTP reuse the same resolved paths below.
const (
tlsModeCertFile = "cert-file"
tlsModeSelfSigned = "self-signed"
)
var (
tlsCertPath string
tlsKeyPath string
tlsMode string // tlsModeCertFile, tlsModeSelfSigned, or "" (plaintext)
)
switch {
case cfg.TLSCertFileMode():
tlsCertPath = cfg.TLSCertFile
tlsKeyPath = cfg.TLSKeyFile
tlsMode = tlsModeCertFile
case cfg.TLSSelfsignedMode():
cp, kp, err := tlsbootstrap.EnsureSelfSignedCert(cfg.TLSCacheDir)
if err != nil {
fatal("Failed to bootstrap self-signed TLS cert", err)
}
tlsCertPath = cp
tlsKeyPath = kp
tlsMode = tlsModeSelfSigned
}
// Start gRPC Server
lis, err := net.Listen("tcp", ":"+cfg.GRPCPort)
if err != nil {
fatal("Failed to listen on gRPC port", err, "port", cfg.GRPCPort)
}
recvBytes := cfg.GRPCMaxRecvMB
if recvBytes <= 0 {
recvBytes = 16
}
streams := cfg.GRPCMaxConcurrentStreams
if streams <= 0 {
streams = 1000
}
grpcOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvBytes * 1024 * 1024),
grpc.MaxConcurrentStreams(uint32(streams)),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 60 * time.Second, // ping idle clients
Timeout: 10 * time.Second, // drop if no pong
MaxConnectionIdle: 10 * time.Minute, // garbage-collect dead NAT entries
MaxConnectionAge: 2 * time.Hour, // force periodic reconnects
MaxConnectionAgeGrace: 30 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}),
// Recovery FIRST so a panic inside the metrics interceptor is still caught.
grpc.ChainUnaryInterceptor(
recoveryUnaryInterceptor(metrics),
metricsUnaryInterceptor(metrics),
),
}
slog.Info("📡 gRPC server tuned",
"max_recv_mb", recvBytes,
"max_concurrent_streams", streams,
)
switch tlsMode {
case tlsModeCertFile:
creds, err := credentials.NewServerTLSFromFile(tlsCertPath, tlsKeyPath)
if err != nil {
fatal("Failed to load gRPC TLS credentials", err)
}
grpcOpts = append(grpcOpts, grpc.Creds(creds))
slog.Info("🔒 gRPC TLS enabled", "mode", tlsModeCertFile)
case tlsModeSelfSigned:
creds, err := credentials.NewServerTLSFromFile(tlsCertPath, tlsKeyPath)
if err != nil {
fatal("Failed to load gRPC TLS credentials (self-signed)", err)
}
grpcOpts = append(grpcOpts, grpc.Creds(creds))
slog.Info("🔒 gRPC TLS enabled", "mode", tlsModeSelfSigned, "cache_dir", cfg.TLSCacheDir)
default:
slog.Info("🔓 gRPC plaintext — not for production; set TLS_CERT_FILE/TLS_KEY_FILE or TLS_AUTO_SELFSIGNED=true")
}
grpcServer := grpc.NewServer(grpcOpts...)
coltracepb.RegisterTraceServiceServer(grpcServer, traceServer)
collogspb.RegisterLogsServiceServer(grpcServer, logsServer)
colmetricspb.RegisterMetricsServiceServer(grpcServer, metricsServer)
reflection.Register(grpcServer)
go func() {
slog.Info("📡 gRPC OTLP receiver started", "port", cfg.GRPCPort)
if err := grpcServer.Serve(lis); err != nil {
fatal("Failed to serve gRPC", err)
}
}()
// Start runtime metrics sampling (every 15s)
metrics.StartRuntimeMetrics()
slog.Info("📊 Runtime metrics sampling started")
// 7b. Register HTTP OTLP endpoints (before catch-all UI handler)
otlpHTTP := ingest.NewHTTPHandler(traceServer, logsServer, metricsServer)
if metrics != nil && metrics.HTTPOTLPThrottledTotal != nil {
otlpHTTP.SetThrottleCallback(func(signal string) {
metrics.HTTPOTLPThrottledTotal.WithLabelValues(signal).Inc()
})
}
// 8. Start HTTP Server
mux := http.NewServeMux()
otlpHTTP.RegisterRoutes(mux)
apiServer.RegisterRoutes(mux)
// MCP Server routes (conditionally enabled via MCP_ENABLED)
if cfg.MCPEnabled {
mcpPath := cfg.MCPPath
if mcpPath == "" {
mcpPath = "/mcp"
}
mux.Handle(mcpPath, http.StripPrefix(mcpPath, mcpServer.Handler()))
mux.Handle(mcpPath+"/", http.StripPrefix(mcpPath, mcpServer.Handler()))
slog.Info("🤖 MCP endpoint registered", "path", mcpPath)
}
// Embedded UI Server
uiServer := ui.NewServer(repo, metrics, svcGraph)
uiServer.SetMCPConfig(cfg.MCPEnabled, cfg.MCPPath)
if err := uiServer.RegisterRoutes(mux); err != nil {
fatal("Failed to register UI routes", err)
}
var httpHandler http.Handler = mux
// Resolve tenant on /api/* read-side requests (passes through OTLP /v1,
// MCP, UI assets, and health probes untouched).
httpHandler = api.TenantMiddleware(cfg)(httpHandler)
// Wire auth-failure metric hook before installing any auth middleware.
api.AuthFailureHook = func(reason string) {
metrics.APIAuthFailuresTotal.WithLabelValues(reason).Inc()
}
// Authentication. Per-tenant keys (if configured) take precedence over the
// shared API key — they enforce tenant boundaries at the auth layer rather
// than trusting a client-supplied X-Tenant-ID header.
switch {
case cfg.APITenantKeysFile != "":
entries, err := api.LoadTenantKeys(cfg.APITenantKeysFile)
if err != nil {
fatal("load tenant keys file", err, "path", cfg.APITenantKeysFile)
}
tka := api.NewTenantKeyAuth(entries)
httpHandler = tka.Middleware(cfg.MCPPath, httpHandler)
slog.Info("🔑 Per-tenant API key authentication enabled", "tenants", len(entries))
case cfg.APIKey != "":
httpHandler = api.APIKeyGate(cfg.APIKey, cfg.MCPPath, httpHandler)
slog.Info("🔑 API key authentication enabled (shared key)")
default:
slog.Warn("API authentication disabled — set API_KEY or API_TENANT_KEYS_FILE for production")
}
httpHandler = api.MetricsMiddleware(metrics, httpHandler)
if cfg.APIRateLimitRPS > 0 {
rl := api.NewRateLimiter(float64(cfg.APIRateLimitRPS))
// OTLP ingestion paths (/v1/*) are exempt from the per-IP rate limiter.
//
// Why: OTLP collectors batch aggressively and a healthy agent routinely
// exceeds the API_RATE_LIMIT_RPS default (100 RPS/IP). Throttling the
// ingestion path drops legitimate telemetry — the exact data this
// platform exists to capture — so /v1/* bypasses the limiter.
//
// DoS trade-off (acknowledged): the APIKeyGate runs *downstream* of the
// limiter in the middleware chain, which means an unauthenticated
// attacker can push /v1/* requests past the (bypassed) limiter all the
// way to the auth check before getting a 401. This is acceptable
// because APIKeyGate is header-only: it inspects the Authorization
// header and returns 401 without parsing the request body, so the
// per-request CPU cost is bounded and small (no protobuf decode, no
// JSON parse, no DB touch). Layer-4/7 protections (firewall, LB,
// WAF, mTLS) remain the primary defense against volumetric abuse.
//
// TODO: if this trade-off becomes a concern (e.g. abuse observed in
// prod, or CPU pressure from 401 storms), add a separate
// higher-ceiling OTLP-specific limiter scoped to /v1/* — tuned for
// collector-class RPS — rather than lowering the general API limit.
httpHandler = rl.MiddlewareExcept(func(path string) bool {
return strings.HasPrefix(path, "/v1/")
})(httpHandler)
slog.Info("🛡️ API rate limiter enabled",
"rps_per_ip", cfg.APIRateLimitRPS,
"exempt_prefixes", []string{"/v1/"},
)
}
// DB health fast-fail gate: returns 503 for DB-dependent paths when the
// pool is unreachable. Probes, metrics, and UI assets bypass.
var dbHealth *api.DBHealth
if sqlDB, dbErr := repo.DB().DB(); dbErr == nil && sqlDB != nil {
dbHealth = api.NewDBHealth(sqlDB, cfg.DBDriver, metrics)
dbHealth.Start(appCtx)
httpHandler = api.DBHealthMiddleware(dbHealth)(httpHandler)
slog.Info("🩺 DB health middleware enabled", "driver", cfg.DBDriver)
} else {
slog.Warn("DB health middleware disabled (cannot get *sql.DB)", "error", dbErr)
}
// GraphRAG event-buffer depth poller (Fix 6).
bootWG.Add(1)
go func() {
defer bootWG.Done()
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for {
select {
case <-appCtx.Done():
return
case <-tick.C:
metrics.GraphRAGEventBufferDepth.Set(float64(graphRAG.EventBufferDepth()))
}
}
}()
// DB pool stats sampler (Task 7 — visibility for DB_MAX_OPEN_CONNS sizing).
// sql.DB.Stats() is cheap (atomic loads on the pool struct), so 5s is fine.
bootWG.Add(1)
go func() {
defer bootWG.Done()
sqlDB, err := repo.DB().DB()
if err != nil || sqlDB == nil {
slog.Warn("DB pool sampler disabled (cannot get *sql.DB)", "error", err)
return
}
// Initial sample so the gauge has a value immediately after startup.
metrics.SampleDBPoolStats(sqlDB)
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-appCtx.Done():
return
case <-tick.C:
metrics.SampleDBPoolStats(sqlDB)
}
}
}()
// Panic recovery: OUTERMOST middleware below OTel tracing — ensures any
// panic in downstream middleware or handlers is logged + metered and the
// process survives.
httpHandler = api.RecoverMiddleware(metrics, httpHandler)
// OTel HTTP instrumentation (outermost — captures every request).
if shutdownTracer != nil {
httpHandler = otelhttp.NewHandler(httpHandler, "otelcontext.http")
}
srv := &http.Server{
Addr: ":" + cfg.HTTPPort,
Handler: httpHandler,
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
if tlsMode != "" {
slog.Info("🔒 HTTPS server started", "port", cfg.HTTPPort, "mode", tlsMode)
if err := srv.ListenAndServeTLS(tlsCertPath, tlsKeyPath); err != nil && err != http.ErrServerClosed {
fatal("HTTPS server failed", err)
}
} else {
slog.Info("🌐 HTTP server started (plaintext — not for production)", "port", cfg.HTTPPort)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fatal("HTTP server failed", err)
}
}
}()
// 9. Graceful Shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
<-stop
slog.Info("Shutting down OtelContext V5.4...")
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Ordered shutdown: ingestion → HTTP → hubs/events → processing → DLQ → DB
// 1. Stop ingestion paths first (no new data)
grpcServer.GracefulStop()
if err := srv.Shutdown(ctx); err != nil {
slog.Error("HTTP server forced shutdown", "error", err)
}
// 2. Stop real-time hubs and event processing
hub.Stop()
cancelEvents()
// Cancel in-flight LLM calls BEFORE Stop so workers don't burn the
// 30s LLM deadline waiting on a half-dead upstream during shutdown.
aiCancel()
aiService.Stop()
// 3. Stop processing engines (TSDB flush, graph, GraphRAG)
tsdbAgg.Stop()
cancelTSDB()
cancelGraph()
graphRAG.Stop()
cancelGraphRAG()
// 3a. Drain async ingest pipeline. gRPC GracefulStop above guarantees
// no new Submits land; this blocks until workers finish in-flight
// batches so a graceful shutdown doesn't lose buffered ingest.
if ingestPipeline != nil {
ingestPipeline.Stop()
}
// 4. Stop DLQ (may still be replaying)
dlq.Stop()
// 4a. Stop retention + partition schedulers before closing DB (both issue queries).
cancelRetention()
retention.Stop()
cancelPartitions()
if partitionScheduler != nil {
partitionScheduler.Stop()
}
// 4b. Shutdown the OTel tracer provider (flushes pending spans).
if shutdownTracer != nil {
if err := shutdownTracer(ctx); err != nil {
slog.Error("Failed to shutdown tracer provider", "error", err)
}
}
// 4b2. Stop DB health poller before cancelling appCtx so final state is
// written to the gauge before the pool closes.
if dbHealth != nil {
dbHealth.Stop()
}
// 4c. Cancel boot-time goroutines (hydrator, DB health poller) and wait
// with a bounded timeout before closing the DB — otherwise a mid-query
// hydrator would race with the pool closing underneath it.
appCancel()
waitDone := make(chan struct{})
go func() { bootWG.Wait(); close(waitDone) }()
select {
case <-waitDone:
case <-time.After(10 * time.Second):
slog.Warn("hydrator did not finish before shutdown; cancelling")
}
// 5. Close database last (everything above may still write)
if err := repo.Close(); err != nil {
slog.Error("Failed to close database", "error", err)
}
slog.Info("✅ OtelContext V5.4 shutdown complete")
}
// recoveryUnaryInterceptor catches panics inside any unary gRPC handler,
// logs the stack, increments the panics-recovered metric, and maps the panic
// to codes.Internal so the connection stays alive.
func recoveryUnaryInterceptor(m *telemetry.Metrics) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp any, err error) {
defer func() {
if rec := recover(); rec != nil {
slog.Error("grpc panic recovered",
"method", info.FullMethod,
"panic", rec,
"stack", string(debug.Stack()),
)
if m != nil && m.PanicsRecoveredTotal != nil {
m.PanicsRecoveredTotal.WithLabelValues("grpc").Inc()
}
err = status.Errorf(codes.Internal, "internal")
}
}()
return handler(ctx, req)
}
}
// metricsUnaryInterceptor records OtelContext_grpc_requests_total and OtelContext_grpc_request_duration_seconds
// for every unary gRPC call.
func metricsUnaryInterceptor(m *telemetry.Metrics) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
start := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(start).Seconds()
status := "ok"
if err != nil {
status = "error"
}
m.GRPCRequestsTotal.WithLabelValues(info.FullMethod, status).Inc()
m.GRPCRequestDuration.WithLabelValues(info.FullMethod).Observe(duration)
return resp, err
}
}
// initTracerProvider builds an OTel tracer provider that exports spans via OTLP
// gRPC to the configured endpoint. The endpoint can be "host:port" (insecure is
// used since the endpoint is typically the platform's own gRPC port or a local
// collector — TLS to an external collector can be added later).
func initTracerProvider(endpoint string) (*sdktrace.TracerProvider, error) {
ctx := context.Background()
client := otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithInsecure(),
)
exporter, err := otlptrace.New(ctx, client)
if err != nil {
return nil, fmt.Errorf("otlptrace.New: %w", err)
}