Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/RandomCodeSpace/otelcontext

go 1.25.10
go 1.25.11

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.1
Expand Down
14 changes: 1 addition & 13 deletions internal/api/log_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@ import (

// handleGetLogs handles GET /api/logs with advanced filtering
func (s *Server) handleGetLogs(w http.ResponseWriter, r *http.Request) {
limit := 50
offset := 0

if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil {
limit = v
}
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil {
offset = v
}
}
limit, offset := parsePaging(r, pagingDefaultLimit)

filter := storage.LogFilter{
ServiceName: r.URL.Query().Get("service_name"),
Expand Down
39 changes: 39 additions & 0 deletions internal/api/log_handlers_cap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,45 @@ func TestHandleGetLogs_NoSearchSkipsCap(t *testing.T) {
}
}

// TestParsePaging_Clamp verifies that parsePaging enforces bounds on limit and
// offset, preventing GORM from receiving a negative Limit (treated as unlimited)
// or a negative offset.
func TestParsePaging_Clamp(t *testing.T) {
cases := []struct {
query string
defaultLimit int
wantLimit int
wantOffset int
}{
// Over-limit capped at 1000.
{"limit=9999&offset=0", 50, 1000, 0},
// Negative limit floored at 1.
{"limit=-5&offset=0", 50, 1, 0},
// Negative offset floored at 0.
{"limit=10&offset=-99", 50, 10, 0},
// Both negative.
{"limit=-1&offset=-1", 50, 1, 0},
// Default limit also clamped.
{"", 9999, 1000, 0},
// Valid values pass through unchanged.
{"limit=100&offset=200", 50, 100, 200},
// Exact cap boundary.
{"limit=1000&offset=0", 50, 1000, 0},
}
for _, tc := range cases {
t.Run(tc.query, func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/api/logs?"+tc.query, nil)
gotLimit, gotOffset := parsePaging(req, tc.defaultLimit)
if gotLimit != tc.wantLimit {
t.Errorf("limit: got %d, want %d", gotLimit, tc.wantLimit)
}
if gotOffset != tc.wantOffset {
t.Errorf("offset: got %d, want %d", gotOffset, tc.wantOffset)
}
})
}
}

// newAPITestRepoWithoutFTS builds a fresh in-memory repo with FTS5 disabled.
// Used by cap tests since they only care about handler behavior, not the
// search backend.
Expand Down
35 changes: 35 additions & 0 deletions internal/api/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,41 @@ func TestTenantMiddleware_MissingHeaderUsesDefault(t *testing.T) {
}
}

// TestTenantMiddleware_DoesNotOverwritePinnedTenant verifies that when
// TenantKeyAuth.Middleware has already pinned a tenant onto the context (the
// per-tenant API-key path), the subsequent TenantMiddleware pass-through does
// NOT overwrite it with the client-supplied X-Tenant-ID header.
//
// This is the regression test for the middleware-ordering bypass:
//
// TenantKeyAuth.Middleware(auth "alpha-key" → pins "alpha")
// → TenantMiddleware(reads X-Tenant-ID: "beta" → must NOT overwrite)
// → handler (must see "alpha")
func TestTenantMiddleware_DoesNotOverwritePinnedTenant(t *testing.T) {
// Build per-tenant key auth: key "alpha-key" → tenant "alpha".
auth := NewTenantKeyAuth(map[string]string{"alpha-key": "alpha"})

cfg := &config.Config{DefaultTenant: "default"}
tc := &tenantCapture{}

// Compose: TenantKeyAuth wraps TenantMiddleware wraps handler.
h := auth.Middleware("/mcp", TenantMiddleware(cfg)(tc.handler()))

req := httptest.NewRequest(http.MethodGet, "/api/logs", nil)
req.Header.Set("Authorization", "Bearer alpha-key")
req.Header.Set(TenantHeader, "beta") // attacker's cross-tenant attempt

rec := httptest.NewRecorder()
h.ServeHTTP(rec, req)

if rec.Code != http.StatusOK {
t.Fatalf("want 200, got %d (body=%q)", rec.Code, rec.Body.String())
}
if tc.got != "alpha" {
t.Errorf("TenantMiddleware overwrote pinned tenant: got %q, want %q", tc.got, "alpha")
}
}

// Non-/api/* paths must pass through without tenant resolution — and so should
// report the default (no ctx value).
func TestTenantMiddleware_NonAPIPath_Passthrough(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"net/http"
"strconv"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/cache"
Expand Down Expand Up @@ -107,6 +108,47 @@ func (s *Server) RegisterRoutes(mux *http.ServeMux) {
mux.HandleFunc("/ws/events", s.eventHub.HandleWebSocket)
}

const (
pagingDefaultLimit = 50
pagingMaxLimit = 1000
)

// parsePaging reads "limit" and "offset" from the request query string and
// applies safety clamping so GORM never sees a negative or unbounded Limit.
//
// - limit: floored at 1, capped at pagingMaxLimit (1000). When absent the
// caller-supplied defaultLimit is used (also clamped).
// - offset: floored at 0. When absent defaults to 0.
func parsePaging(r *http.Request, defaultLimit int) (limit, offset int) {
limit = defaultLimit
if limit < 1 {
limit = 1
}
if limit > pagingMaxLimit {
limit = pagingMaxLimit
}
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil {
limit = v
}
}
if limit < 1 {
limit = 1
}
if limit > pagingMaxLimit {
limit = pagingMaxLimit
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil {
offset = v
}
}
if offset < 0 {
offset = 0
}
return limit, offset
}

// parseTimeRange parses start and end times from request query parameters
func parseTimeRange(r *http.Request) (time.Time, time.Time, error) {
var start, end time.Time
Expand Down
9 changes: 8 additions & 1 deletion internal/api/tenant_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/RandomCodeSpace/otelcontext/internal/storage"
)


// TenantHeader is the canonical HTTP header carrying the tenant ID on
// read-side (query) requests. Ingest paths resolve tenant separately via gRPC
// metadata / OTLP resource attributes and do not go through this middleware.
Expand All @@ -35,6 +34,14 @@ func TenantMiddleware(cfg *config.Config) func(http.Handler) http.Handler {
next.ServeHTTP(w, r)
return
}
// If an upstream layer (e.g. TenantKeyAuth.Middleware) has already
// pinned a tenant onto the context, do not overwrite it — that would
// allow a client to escape their key-bound tenant by supplying a
// different X-Tenant-ID header.
if storage.HasTenantContext(r.Context()) {
next.ServeHTTP(w, r)
return
}
// SanitizeTenantID returns "" for empty / over-length / control-char
// values so they fall through to the configured default — see
// storage.SanitizeTenantID. Hostile or misconfigured clients cannot
Expand Down
14 changes: 1 addition & 13 deletions internal/api/trace_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,13 @@ import (
"fmt"
"log/slog"
"net/http"
"strconv"

"github.com/RandomCodeSpace/otelcontext/internal/api/views"
)

// handleGetTraces handles GET /api/traces
func (s *Server) handleGetTraces(w http.ResponseWriter, r *http.Request) {
limit := 20
offset := 0
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil {
limit = v
}
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil {
offset = v
}
}
limit, offset := parsePaging(r, 20)

start, end, err := parseTimeRange(r)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions internal/graphrag/anomaly.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func (g *GraphRAG) detectAnomaliesForTenant(ctx context.Context, tenant string,
baselineErrorRate := 0.02 // reasonable baseline
if svc.ErrorRate > baselineErrorRate*2 && svc.ErrorRate > 0.05 {
anomaly := AnomalyNode{
ID: fmt.Sprintf("anom_%s_err_%d", svc.Name, now.UnixNano()),
// Stable ID per (service, type): each detection tick UPSERTS the
// same evolving anomaly node rather than minting a new one. With a
// UnixNano suffix an ongoing spike created a fresh node every 10s
// (and correlateWithRecent then minted O(N²) PRECEDED_BY edges),
// which grew the AnomalyStore to ~1.3 GB over a 15-min soak.
ID: fmt.Sprintf("anom_%s_err", svc.Name),
Type: AnomalyErrorSpike,
Severity: classifyErrorSeverity(svc.ErrorRate),
Service: svc.Name,
Expand All @@ -49,7 +54,7 @@ func (g *GraphRAG) detectAnomaliesForTenant(ctx context.Context, tenant string,
// Latency degradation: p99-like check using avg * 3 as proxy
if svc.AvgLatency > 500 && svc.CallCount > 10 {
anomaly := AnomalyNode{
ID: fmt.Sprintf("anom_%s_lat_%d", svc.Name, now.UnixNano()),
ID: fmt.Sprintf("anom_%s_lat", svc.Name), // stable per (service,type); see error-spike note
Type: AnomalyLatencySpike,
Severity: classifyLatencySeverity(svc.AvgLatency),
Service: svc.Name,
Expand Down Expand Up @@ -79,7 +84,7 @@ func (g *GraphRAG) detectAnomaliesForTenant(ctx context.Context, tenant string,
deviation := (m.RollingAvg - (m.RollingMin + rangeSize/2)) / (rangeSize / 2)
if deviation > 3.0 || deviation < -3.0 {
anomaly := AnomalyNode{
ID: fmt.Sprintf("anom_%s_metric_%d", m.Service, now.UnixNano()),
ID: fmt.Sprintf("anom_%s_metric_%s", m.Service, m.MetricName), // stable per (service,metric)
Type: AnomalyMetricZScore,
Severity: SeverityWarning,
Service: m.Service,
Expand Down
42 changes: 42 additions & 0 deletions internal/graphrag/anomaly_dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package graphrag

import (
"testing"
"time"
)

// TestAnomalyDedupBoundsStore proves the root-cause fix for the soak finding:
// stable per-(service,type) anomaly IDs make repeated detection ticks UPSERT a
// single evolving node instead of minting a new one each tick. With the old
// UnixNano-suffixed IDs, 100 ticks over 2 services produced 200 nodes and an
// O(N²) PRECEDED_BY edge explosion (which grew AnomalyStore to ~1.3 GB in a
// 15-min soak). Under the fix, node and edge counts stay bounded regardless of
// how many ticks fire.
func TestAnomalyDedupBoundsStore(t *testing.T) {
stores := newTenantStores(time.Hour)
base := time.Unix(1_700_000_000, 0)

const ticks = 100
for i := range ticks {
ts := base.Add(time.Duration(i) * 10 * time.Second)
for _, svc := range []string{"checkout", "payments"} {
a := AnomalyNode{
ID: "anom_" + svc + "_err", // stable ID, mirrors detectAnomaliesForTenant
Type: AnomalyErrorSpike,
Service: svc,
Timestamp: ts,
}
stores.anomalies.AddAnomaly(a)
correlateWithRecent(stores, a)
}
}

if got := len(stores.anomalies.Anomalies); got != 2 {
t.Fatalf("expected 2 deduped anomaly nodes after %d ticks, got %d", ticks, got)
}
// TRIGGERED_BY: 2 (one per node). PRECEDED_BY: at most the 2-node mesh.
// The point is it does NOT scale with tick count.
if got := len(stores.anomalies.Edges); got > 8 {
t.Fatalf("edge count must stay bounded under dedup, got %d after %d ticks", got, ticks)
}
}
9 changes: 8 additions & 1 deletion internal/ingest/otlp_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,16 @@ const headerContentType = "Content-Type" //nolint:goconst // single literal; Son
// to the request context before delegating to the gRPC Export methods.
// Uses the shared storage.WithTenantContext helper so ingest and read paths
// agree on the context key.
//
// The raw header value is run through storage.SanitizeTenantID — the same
// sanitizer applied on the gRPC metadata path in tenantFromContext — so
// control characters, oversized strings, and empty values are rejected
// identically regardless of transport.
func withTenantFromHTTP(r *http.Request) context.Context {
if v := r.Header.Get("X-Tenant-ID"); v != "" {
return storage.WithTenantContext(r.Context(), v)
if sanitized := storage.SanitizeTenantID(v); sanitized != "" {
return storage.WithTenantContext(r.Context(), sanitized)
}
}
return r.Context()
}
Expand Down
Loading