From 186b930614963cbd0f26989f2c594cf574408bb6 Mon Sep 17 00:00:00 2001 From: Johan Carlin Date: Mon, 8 Jun 2026 21:01:08 +0200 Subject: [PATCH 1/3] Add minimal OpenTelemetry instrumentation --- README.md | 36 + cmd/fake-responses/main.go | 20 +- cmd/fake-responses/main_test.go | 35 + cmd/pyttechat/main.go | 21 +- cmd/pyttechat/main_test.go | 16 + docs/otel-collector.yaml | 25 + go.mod | 29 +- go.sum | 73 ++- internal/chat/service.go | 54 ++ internal/cli/root.go | 33 +- internal/cli/root_test.go | 10 +- internal/llm/openresponses/client.go | 90 ++- internal/llm/openresponses/client_test.go | 39 ++ .../openresponses/fakeprovider/provider.go | 28 +- internal/observability/observability.go | 618 ++++++++++++++++++ internal/observability/observability_test.go | 410 ++++++++++++ internal/web/server.go | 27 +- 17 files changed, 1533 insertions(+), 31 deletions(-) create mode 100644 docs/otel-collector.yaml create mode 100644 internal/observability/observability.go create mode 100644 internal/observability/observability_test.go diff --git a/README.md b/README.md index 75b0ad8..dca6070 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,42 @@ PYTTECHAT_LLM_PROXY_TOKEN=token-value \ Set `--reasoning-effort` when you want to request model reasoning options. Assistant answer text streams to stdout. Reasoning events, when returned, stream separately to stderr. Proxy requests default to a 5-minute timeout; override it with `--proxy-timeout` or `PYTTECHAT_LLM_PROXY_TIMEOUT`. +## OpenTelemetry + +Telemetry is disabled by default. If no OTLP endpoint is configured, `pyttechat` and `fake-responses` run without an OpenTelemetry Collector and use no external telemetry network calls. + +To enable traces and metrics locally, run an OpenTelemetry Collector with the provided development config: + +```sh +otelcol --config docs/otel-collector.yaml +``` + +Then start the app with an OTLP/gRPC endpoint: + +```sh +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \ +OTEL_SERVICE_NAME=pyttechat \ + ./bin/pyttechat serve --addr :3000 +``` + +Supported telemetry environment variables: + +- `OTEL_SERVICE_NAME`: service name, default `pyttechat`. +- `OTEL_EXPORTER_OTLP_ENDPOINT`: shared OTLP endpoint for traces and metrics. +- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`: traces-only OTLP endpoint. +- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`: metrics-only OTLP endpoint. +- `OTEL_RESOURCE_ATTRIBUTES`: additional resource attributes, for example `deployment.environment=local`. +- `OTEL_SDK_DISABLED=true`: force telemetry off. + +Emitted signals: + +- HTTP server spans and HTTP client spans, including trace context propagation to the LLM proxy. +- Manual spans for CLI commands, chat turn start, SSE streaming, abort/cancel, OpenResponses proxy requests, stream consumption, and fake provider streaming. +- Counters for chat turns started/completed/cancelled/failed and streamed LLM/SSE events. +- Histograms for upstream LLM request duration and chat turn duration. + +Do not attach prompt text, user messages, OAuth tokens, API keys, bearer tokens, cookies, session IDs, user IDs, turn IDs, model output, or raw LLM responses to telemetry. + ## Fake OpenResponses Provider The repo includes a deterministic fake OpenAI-compatible Responses API provider for tests and local development. Prefer `internal/llm/openresponses/fakeprovider.NewHandler()` in Go tests instead of hand-written happy-path SSE stubs. Keep custom `httptest` handlers for malformed streams, auth assertions, and narrow parser edge cases. diff --git a/cmd/fake-responses/main.go b/cmd/fake-responses/main.go index 56d8334..39382c4 100644 --- a/cmd/fake-responses/main.go +++ b/cmd/fake-responses/main.go @@ -12,7 +12,9 @@ import ( "syscall" "time" + "example.com/llm-chat-web/internal/buildinfo" "example.com/llm-chat-web/internal/llm/openresponses/fakeprovider" + "example.com/llm-chat-web/internal/observability" ) type fakeResponsesServer interface { @@ -26,7 +28,7 @@ var ( newServer = func(addr string, opts fakeprovider.Options) fakeResponsesServer { return &http.Server{ Addr: addr, - Handler: fakeprovider.NewHandlerWithOptions(opts), + Handler: observability.HTTPMiddleware(fakeprovider.NewHandlerWithOptions(opts)), ReadHeaderTimeout: 5 * time.Second, } } @@ -41,7 +43,7 @@ func main() { exit(runCommand(os.Args[1:], os.Stderr)) } -func run(args []string, stderr io.Writer) int { +func run(args []string, stderr io.Writer) (code int) { flags := flag.NewFlagSet("fake-responses", flag.ContinueOnError) flags.SetOutput(stderr) addr := flags.String("addr", ":8080", "address for the fake Responses API provider") @@ -53,6 +55,20 @@ func run(args []string, stderr io.Writer) int { return 2 } + shutdown, err := observability.Init(context.Background(), observability.FromEnv(buildinfo.Snapshot())) + if err != nil { + fmt.Fprintln(stderr, err) + return 1 + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := shutdown(ctx); err != nil && code == 0 { + fmt.Fprintln(stderr, err) + code = 1 + } + }() + server := newServer(*addr, fakeprovider.Options{StreamDelay: *streamDelay}) errc := make(chan error, 1) go func() { diff --git a/cmd/fake-responses/main_test.go b/cmd/fake-responses/main_test.go index edddd9f..98f10d0 100644 --- a/cmd/fake-responses/main_test.go +++ b/cmd/fake-responses/main_test.go @@ -137,6 +137,7 @@ func TestRunTreatsServerClosedAsSuccess(t *testing.T) { func TestRunPassesStreamDelayOption(t *testing.T) { server := &stubFakeResponsesServer{done: make(chan struct{})} + disableTelemetryEnv(t) var gotAddr string var gotOpts fakeprovider.Options original := newServer @@ -166,6 +167,24 @@ func TestRunPassesStreamDelayOption(t *testing.T) { } } +func TestRunStartsWithTelemetryDisabledByDefault(t *testing.T) { + server := &stubFakeResponsesServer{shutdownErr: nil} + withFakeResponseServer(t, server) + sigc := make(chan os.Signal, 1) + sigc <- os.Interrupt + withSignalChannel(t, sigc) + var stderr bytes.Buffer + + code := run([]string{"--addr", "127.0.0.1:0"}, &stderr) + + if code != 0 { + t.Fatalf("exit code = %d, want 0; stderr = %q", code, stderr.String()) + } + if server.shutdowns != 1 { + t.Fatalf("shutdown count = %d, want 1", server.shutdowns) + } +} + func TestDefaultFactories(t *testing.T) { server := newServer("127.0.0.1:0", fakeprovider.Options{}) httpServer, ok := server.(*http.Server) @@ -185,6 +204,7 @@ func TestDefaultFactories(t *testing.T) { func withFakeResponseServer(t *testing.T, server *stubFakeResponsesServer) { t.Helper() + disableTelemetryEnv(t) if server.done == nil { server.done = make(chan struct{}) @@ -198,6 +218,21 @@ func withFakeResponseServer(t *testing.T, server *stubFakeResponsesServer) { }) } +func disableTelemetryEnv(t *testing.T) { + t.Helper() + + for _, name := range []string{ + "OTEL_SERVICE_NAME", + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_RESOURCE_ATTRIBUTES", + "OTEL_SDK_DISABLED", + } { + t.Setenv(name, "") + } +} + func withSignalChannel(t *testing.T, sigc chan os.Signal) { t.Helper() diff --git a/cmd/pyttechat/main.go b/cmd/pyttechat/main.go index fd0c150..45b383c 100644 --- a/cmd/pyttechat/main.go +++ b/cmd/pyttechat/main.go @@ -2,9 +2,13 @@ package main import ( "context" + "fmt" "os" + "time" + "example.com/llm-chat-web/internal/buildinfo" "example.com/llm-chat-web/internal/cli" + "example.com/llm-chat-web/internal/observability" ) var ( @@ -13,5 +17,20 @@ var ( ) func main() { - exit(execute(context.Background(), os.Args[1:], os.Stdin, os.Stdout, os.Stderr)) + ctx := context.Background() + shutdown, err := observability.Init(ctx, observability.FromEnv(buildinfo.Snapshot())) + if err != nil { + fmt.Fprintln(os.Stderr, err) + exit(1) + return + } + + code := execute(ctx, os.Args[1:], os.Stdin, os.Stdout, os.Stderr) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := shutdown(shutdownCtx); err != nil && code == 0 { + fmt.Fprintln(os.Stderr, err) + code = 1 + } + cancel() + exit(code) } diff --git a/cmd/pyttechat/main_test.go b/cmd/pyttechat/main_test.go index e971aab..4d7ab9d 100644 --- a/cmd/pyttechat/main_test.go +++ b/cmd/pyttechat/main_test.go @@ -9,6 +9,7 @@ import ( ) func TestMainDelegatesToCLIExitStatus(t *testing.T) { + disableTelemetryEnv(t) originalArgs := os.Args originalExit := exit originalExecute := execute @@ -41,3 +42,18 @@ func TestMainDelegatesToCLIExitStatus(t *testing.T) { t.Fatalf("args = %#v, want version", gotArgs) } } + +func disableTelemetryEnv(t *testing.T) { + t.Helper() + + for _, name := range []string{ + "OTEL_SERVICE_NAME", + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_RESOURCE_ATTRIBUTES", + "OTEL_SDK_DISABLED", + } { + t.Setenv(name, "") + } +} diff --git a/docs/otel-collector.yaml b/docs/otel-collector.yaml new file mode 100644 index 0000000..29dfa0b --- /dev/null +++ b/docs/otel-collector.yaml @@ -0,0 +1,25 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] diff --git a/go.mod b/go.mod index 5fefc15..546dd1b 100644 --- a/go.mod +++ b/go.mod @@ -8,24 +8,45 @@ require ( github.com/spf13/cobra v1.10.2 github.com/yuin/goldmark v1.8.2 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc - golang.org/x/crypto v0.50.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0 + go.opentelemetry.io/otel v1.44.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.44.0 + go.opentelemetry.io/otel/metric v1.44.0 + go.opentelemetry.io/otel/sdk v1.44.0 + go.opentelemetry.io/otel/sdk/metric v1.44.0 + go.opentelemetry.io/otel/trace v1.44.0 + golang.org/x/crypto v0.51.0 modernc.org/sqlite v1.50.1 ) require ( github.com/aymerick/douceur v0.2.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/css v1.0.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/spf13/pflag v1.0.9 // indirect - golang.org/x/net v0.53.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/tools v0.44.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect + golang.org/x/net v0.55.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/text v0.37.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect + google.golang.org/grpc v1.81.1 // indirect + google.golang.org/protobuf v1.36.11 // indirect modernc.org/libc v1.72.3 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index 2d0e418..dddbd83 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,10 @@ github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae h1:zzGwJfFlFGD94Cy github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -13,12 +17,25 @@ github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8= github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF28c5ZQfqCBQ5g2xfk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -39,30 +56,70 @@ github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiT github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.4.15/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE= github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg= github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc h1:+IAOyRda+RLrxa1WC7umKOZRsGq4QrFFMYApOeHzQwQ= github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc/go.mod h1:ovIvrum6DQJA4QsJSovrkC4saKHQVs7TvcaeO8AIl5I= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0 h1:8tvICD4vSTOOsNrsI4Ljf6C+6UKvpTEH5XY3JMoyPoo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0/go.mod h1:z9+yiacE0IHRqM4qFfkbt/JYlmYXgss8GY/jXoNuPJI= +go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= +go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0 h1:SUplec5dp06reu1zaXmOXdvqH398taqrDXqUl99jxSc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.44.0/go.mod h1:ho2g4N+ane+swq5I/VBkKWnRDY4kUINH3FuqyZqX/Ug= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 h1:4YsVu3B8+3qtWYYrsUYgn0OG78pN0rnNPRGX4SbokQI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0/go.mod h1:+wnlSn0mD1ADVMe3v9Z/WIaiz6q6gL2J/ejaAmdmv80= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.44.0 h1:qazEJlUOQzhCpzQpFETGby7EdqjI1wsd0W+6Gg1SCTU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.44.0/go.mod h1:fOD2Yefuxixkx3ahVNf0O/PERb6r4OlbxfATVnYvzCo= +go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= +go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= +go.opentelemetry.io/otel/metric/x v0.66.0 h1:YkCrx1zLOChi9ZcZ6euupOcsgzbVlec7D/xoEU1+cTA= +go.opentelemetry.io/otel/metric/x v0.66.0/go.mod h1:d1+BDj9t96do0/1LoU1ayfCv79ZgNE41qbhBvnMOBZk= +go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58= +go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0= +go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI= +go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA= +go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= +go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= -golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= -golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= -golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= +golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= +golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c= golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa h1:Kjn0N0tCrDgiAFW+lGO4JZ3ck44CehvJQMAwj9QF0G8= +google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:q4lMZS6kskjT5HvCPrnnypcDPVJqT/f4nfxmkE7gryY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa h1:mZHHdPZl0dbGHCflZgAq/Q468DWVFcU2whhB2KAo8fk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= +google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= diff --git a/internal/chat/service.go b/internal/chat/service.go index 8319db2..8547695 100644 --- a/internal/chat/service.go +++ b/internal/chat/service.go @@ -5,8 +5,10 @@ import ( "errors" "strings" "sync" + "time" "example.com/llm-chat-web/internal/llm" + "example.com/llm-chat-web/internal/observability" ) var ( @@ -73,6 +75,10 @@ func (s *Session) Send(ctx context.Context, prompt string, opts SendOptions) (*T return nil, ErrEmptyPrompt } + ctx, span := observability.StartSpan(ctx, "chat.turn.start", "") + startedAt := time.Now() + defer span.End() + userMessage := llm.NewTextMessage(llm.RoleUser, prompt) request := llm.Request{ Model: opts.Model, @@ -88,15 +94,19 @@ func (s *Session) Send(ctx context.Context, prompt string, opts SendOptions) (*T s.mu.Lock() if s.inFlight { s.mu.Unlock() + observability.RecordSpanError(span, ErrTurnInProgress) return nil, ErrTurnInProgress } request.Messages = append(llm.CloneMessages(s.messages), userMessage.Clone()) s.inFlight = true s.mu.Unlock() + observability.ChatTurnStarted(ctx) stream, err := s.client.Stream(ctx, request) if err != nil { s.releaseTurn() + observability.ChatTurnFailed(ctx, startedAt, err) + observability.RecordSpanError(span, err) return nil, err } @@ -104,6 +114,8 @@ func (s *Session) Send(ctx context.Context, prompt string, opts SendOptions) (*T session: s, stream: stream, userMessage: userMessage, + ctx: ctx, + startedAt: startedAt, }, nil } @@ -117,15 +129,19 @@ type TurnStream struct { session *Session stream llm.Stream userMessage llm.Message + ctx context.Context + startedAt time.Time assistantParts []llm.Part completed bool finalized bool + recorded bool } func (s *TurnStream) Next() (llm.Event, error) { event, err := s.stream.Next() if err != nil { + s.recordError(err) s.abort() return llm.Event{}, err } @@ -140,11 +156,14 @@ func (s *TurnStream) Next() (llm.Event, error) { case llm.EventCompleted: s.completed = true if err := s.finalize(); err != nil { + s.recordError(err) s.abort() return event, err } + s.recordCompleted() case llm.EventError: if event.Err != nil { + s.recordFailed(event.Err) s.abort() return event, event.Err } @@ -154,6 +173,9 @@ func (s *TurnStream) Next() (llm.Event, error) { } func (s *TurnStream) Close() error { + if !s.completed { + s.recordCancelled() + } s.abort() return s.stream.Close() } @@ -238,6 +260,38 @@ func (s *TurnStream) abort() { s.session.releaseTurn() } +func (s *TurnStream) recordCompleted() { + if s.recorded { + return + } + s.recorded = true + observability.ChatTurnCompleted(s.ctx, s.startedAt) +} + +func (s *TurnStream) recordCancelled() { + if s.recorded { + return + } + s.recorded = true + observability.ChatTurnCancelled(s.ctx, s.startedAt) +} + +func (s *TurnStream) recordFailed(err error) { + if s.recorded { + return + } + s.recorded = true + observability.ChatTurnFailed(s.ctx, s.startedAt, err) +} + +func (s *TurnStream) recordError(err error) { + if errors.Is(err, context.Canceled) { + s.recordCancelled() + return + } + s.recordFailed(err) +} + func (s *Session) releaseTurn() { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/cli/root.go b/internal/cli/root.go index 05df880..adf11eb 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -20,10 +20,12 @@ import ( "example.com/llm-chat-web/internal/llm" "example.com/llm-chat-web/internal/llm/dummy" "example.com/llm-chat-web/internal/llm/openresponses" + "example.com/llm-chat-web/internal/observability" "example.com/llm-chat-web/internal/storage" "example.com/llm-chat-web/internal/web" "github.com/spf13/cobra" + "go.opentelemetry.io/otel/attribute" ) type rootOptions struct { @@ -135,7 +137,7 @@ func newServeCommand(stdout, stderr io.Writer, opts *rootOptions) *cobra.Command Store: store, SessionTTL: opts.sessionTTL, }) - handler := web.NewServer(web.Options{ + handler := observability.HTTPMiddleware(web.NewServer(web.Options{ Client: newLLMClient(*opts), Model: opts.model, ReasoningEffort: opts.reasoningEffort, @@ -143,7 +145,7 @@ func newServeCommand(stdout, stderr io.Writer, opts *rootOptions) *cobra.Command Store: store, Auth: authService, RegistrationEnabled: opts.registration, - }) + })) server := newWebServer(opts.webAddr, handler) listener, err := listenTCP(ctx, opts.webAddr) if err != nil { @@ -210,7 +212,7 @@ func newAskCommand(stdout, stderr io.Writer, opts *rootOptions) *cobra.Command { return err } - return printStream(stream, stdout, stderr) + return printStream(cmd.Context(), stream, stdout, stderr) }, } } @@ -239,7 +241,7 @@ func newChatCommand(stdin io.Reader, stdout, stderr io.Writer, opts *rootOptions } return err } - if err := printStream(stream, stdout, stderr); err != nil { + if err := printStream(cmd.Context(), stream, stdout, stderr); err != nil { return err } } @@ -342,7 +344,7 @@ func envBoolDefault(name string, fallback bool) bool { } } -func printStream(stream *chat.TurnStream, stdout, stderr io.Writer) error { +func printStream(ctx context.Context, stream *chat.TurnStream, stdout, stderr io.Writer) error { defer stream.Close() wroteText := false @@ -357,6 +359,7 @@ func printStream(stream *chat.TurnStream, stdout, stderr io.Writer) error { switch event.Type { case llm.EventTextDelta: + observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) if _, err := fmt.Fprint(stdout, event.Delta); err != nil { return err } @@ -364,9 +367,12 @@ func printStream(stream *chat.TurnStream, stdout, stderr io.Writer) error { wroteText = true } case llm.EventReasoningDelta: + observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) if _, err := fmt.Fprint(stderr, event.Delta); err != nil { return err } + case llm.EventOutputItemDone, llm.EventCompleted, llm.EventError: + observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) } } if wroteText { @@ -397,6 +403,10 @@ func newVersionCommand(stdout io.Writer) *cobra.Command { } func Execute(ctx context.Context, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + ctx = observability.ContextWithComponent(ctx, observability.ComponentCLI) + ctx, span := observability.StartSpan(ctx, "cli.command", observability.ComponentCLI, attribute.String("cli.command.name", cliCommandName(args))) + defer span.End() + cmd := NewRootCommand(stdin, stdout, stderr) //nolint:contextcheck cmd.SetArgs(args) cmd.SetContext(ctx) @@ -416,3 +426,16 @@ func Execute(ctx context.Context, args []string, stdin io.Reader, stdout, stderr return 0 } + +func cliCommandName(args []string) string { + for _, arg := range args { + switch arg { + case "ask", "chat", "serve", "version": + return arg + } + } + if len(args) == 0 { + return "help" + } + return "unknown" +} diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go index 6a293df..ff8cb3e 100644 --- a/internal/cli/root_test.go +++ b/internal/cli/root_test.go @@ -622,7 +622,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { var stdout bytes.Buffer var stderr bytes.Buffer - if err := printStream(stream, &stdout, &stderr); err != nil { + if err := printStream(context.Background(), stream, &stdout, &stderr); err != nil { t.Fatalf("printStream error = %v, want nil", err) } if stdout.String() != "" { @@ -638,7 +638,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { {Type: llm.EventTextDelta, Delta: "answer"}, }) - if err := printStream(stream, failingWriter{}, io.Discard); err == nil { + if err := printStream(context.Background(), stream, failingWriter{}, io.Discard); err == nil { t.Fatalf("printStream error = nil, want stdout writer error") } }) @@ -648,7 +648,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { {Type: llm.EventReasoningDelta, Delta: "thinking"}, }) - if err := printStream(stream, io.Discard, failingWriter{}); err == nil { + if err := printStream(context.Background(), stream, io.Discard, failingWriter{}); err == nil { t.Fatalf("printStream error = nil, want stderr writer error") } }) @@ -659,7 +659,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { if err != nil { t.Fatalf("Send() error = %v, want nil", err) } - if err := printStream(stream, io.Discard, io.Discard); err == nil { + if err := printStream(context.Background(), stream, io.Discard, io.Discard); err == nil { t.Fatalf("printStream error = nil, want stream error") } }) @@ -671,7 +671,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { }) writer := &failAfterWriter{failAt: 1} - if err := printStream(stream, writer, io.Discard); err == nil { + if err := printStream(context.Background(), stream, writer, io.Discard); err == nil { t.Fatalf("printStream error = nil, want newline writer error") } }) diff --git a/internal/llm/openresponses/client.go b/internal/llm/openresponses/client.go index 95e336e..08bc276 100644 --- a/internal/llm/openresponses/client.go +++ b/internal/llm/openresponses/client.go @@ -14,6 +14,10 @@ import ( "time" "example.com/llm-chat-web/internal/llm" + "example.com/llm-chat-web/internal/observability" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ErrStreamFailed = errors.New("openresponses stream failed") @@ -27,10 +31,12 @@ type Client struct { } var marshalJSON = json.Marshal +var instrumentHTTPTransport = observability.HTTPClientTransport type Options struct { Timeout time.Duration BearerToken string + Transport http.RoundTripper } func NewClient(baseURL string) *Client { @@ -47,21 +53,36 @@ func NewClientWithOptions(baseURL string, opts Options) *Client { timeout = DefaultTimeout } + transport := opts.Transport + if transport == nil { + transport = http.DefaultTransport + } + return &Client{ baseURL: strings.TrimRight(baseURL, "/"), - httpClient: &http.Client{Timeout: timeout}, + httpClient: &http.Client{Timeout: timeout, Transport: instrumentHTTPTransport(transport)}, bearerToken: strings.TrimSpace(opts.BearerToken), } } func (c *Client) Stream(ctx context.Context, request llm.Request) (llm.Stream, error) { + ctx, span := observability.StartSpan(ctx, "llm_proxy.responses.create", observability.ComponentLLMProxy) + startedAt := time.Now() + status := observability.ChatTurnStatusFailed + defer func() { + observability.RecordLLMRequestDuration(ctx, observability.ComponentLLMProxy, startedAt, status) + span.End() + }() + body, err := marshalJSON(c.createRequestBody(request)) if err != nil { + observability.RecordSpanError(span, err) return nil, err } httpRequest, err := http.NewRequestWithContext(ctx, http.MethodPost, c.responsesURL(), bytes.NewReader(body)) if err != nil { + observability.RecordSpanError(span, err) return nil, err } httpRequest.Header.Set("Accept", "text/event-stream") @@ -72,18 +93,25 @@ func (c *Client) Stream(ctx context.Context, request llm.Request) (llm.Stream, e response, err := c.httpClient.Do(httpRequest) if err != nil { + observability.RecordSpanError(span, err) return nil, err } if response.StatusCode < 200 || response.StatusCode >= 300 { defer response.Body.Close() errorBody, _ := io.ReadAll(io.LimitReader(response.Body, 4096)) - return nil, fmt.Errorf("%w: status %d: %s", ErrStreamFailed, response.StatusCode, strings.TrimSpace(string(errorBody))) + err := fmt.Errorf("%w: status %d: %s", ErrStreamFailed, response.StatusCode, strings.TrimSpace(string(errorBody))) + observability.RecordSpanError(span, err) + return nil, err } + status = observability.ChatTurnStatusCompleted + consumeCtx, consumeSpan := observability.StartSpan(ctx, "openresponses.stream.consume", observability.ComponentLLMProxy) return &stream{ body: response.Body, reader: bufio.NewReader(response.Body), textSeen: map[string]bool{}, + ctx: consumeCtx, + span: consumeSpan, }, nil } @@ -189,6 +217,9 @@ type stream struct { data []string done bool textSeen map[string]bool + ctx context.Context + span trace.Span + spanDone bool } func (s *stream) Next() (llm.Event, error) { @@ -202,17 +233,25 @@ func (s *stream) Next() (llm.Event, error) { if errors.Is(err, io.EOF) { event, ok, dispatchErr := s.dispatch() if dispatchErr != nil || ok { + if dispatchErr != nil { + s.finish(dispatchErr) + } return event, dispatchErr } s.done = true + s.finish(nil) return llm.Event{}, io.EOF } + s.finish(err) return llm.Event{}, err } if line == "" { event, ok, err := s.dispatch() if err != nil || ok { + if err != nil { + s.finish(err) + } return event, err } continue @@ -228,6 +267,9 @@ func (s *stream) Next() (llm.Event, error) { } func (s *stream) Close() error { + if !s.done { + s.finish(context.Canceled) + } return s.body.Close() } @@ -252,16 +294,38 @@ func (s *stream) dispatch() (llm.Event, bool, error) { if raw == "[DONE]" { s.done = true + s.finish(nil) return llm.Event{}, false, io.EOF } var payload map[string]any if err := json.Unmarshal([]byte(raw), &payload); err != nil { + s.finish(err) return llm.Event{}, false, err } + s.recordPayload(payload) return s.mapPayload(payload) } +func (s *stream) recordPayload(payload map[string]any) { + eventType := sanitizeOpenResponsesEventType(stringField(payload, "type")) + observability.LLMStreamEvent(s.ctx, observability.ComponentLLMProxy, eventType) + if s.span != nil { + s.span.AddEvent("openresponses.stream.event", trace.WithAttributes(attribute.String("llm.stream.event_type", eventType))) + } +} + +func (s *stream) finish(err error) { + if s.span == nil || s.spanDone { + return + } + s.spanDone = true + if err != nil && !errors.Is(err, io.EOF) { + observability.RecordSpanError(s.span, err) + } + s.span.End() +} + func (s *stream) mapPayload(payload map[string]any) (llm.Event, bool, error) { eventType, _ := payload["type"].(string) switch eventType { @@ -456,6 +520,28 @@ func stringField(payload map[string]any, name string) string { return value } +func sanitizeOpenResponsesEventType(eventType string) string { + switch eventType { + case "response.created", + "response.in_progress", + "response.output_item.added", + "response.output_item.done", + "response.content_part.added", + "response.content_part.done", + "response.output_text.delta", + "response.output_text.done", + "response.reasoning.delta", + "response.reasoning_summary_text.delta", + "response.reasoning_text.delta", + "response.completed", + "response.failed", + "error": + return eventType + default: + return "unknown" + } +} + func intField(payload map[string]any, name string) int { switch value := payload[name].(type) { case float64: diff --git a/internal/llm/openresponses/client_test.go b/internal/llm/openresponses/client_test.go index 7244301..8ca5fa6 100644 --- a/internal/llm/openresponses/client_test.go +++ b/internal/llm/openresponses/client_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "example.com/llm-chat-web/internal/llm" "example.com/llm-chat-web/internal/llm/openresponses/fakeprovider" @@ -23,6 +24,38 @@ func TestNewClientSetsDefaultTimeout(t *testing.T) { } } +func TestNewClientUsesInstrumentedTransportAndPreservesTimeout(t *testing.T) { + original := instrumentHTTPTransport + t.Cleanup(func() { + instrumentHTTPTransport = original + }) + + base := markerTransport{} + var called bool + instrumentHTTPTransport = func(got http.RoundTripper) http.RoundTripper { + called = true + if got != base { + t.Fatalf("base transport = %T, want injected base transport", got) + } + return got + } + + client := NewClientWithOptions("http://example.test", Options{ + Timeout: 2 * time.Second, + Transport: base, + }) + + if !called { + t.Fatalf("instrumentHTTPTransport was not called") + } + if client.httpClient.Timeout != 2*time.Second { + t.Fatalf("timeout = %s, want configured timeout", client.httpClient.Timeout) + } + if client.httpClient.Transport != base { + t.Fatalf("transport = %T, want instrumented transport returned by seam", client.httpClient.Transport) + } +} + func TestClientStreamsFromFakeProvider(t *testing.T) { server := httptest.NewServer(fakeprovider.NewHandler()) defer server.Close() @@ -771,6 +804,12 @@ func (f roundTripFunc) RoundTrip(request *http.Request) (*http.Response, error) return f(request) } +type markerTransport struct{} + +func (markerTransport) RoundTrip(*http.Request) (*http.Response, error) { + return nil, errors.New("not used") +} + type errReader struct{} func (errReader) Read([]byte) (int, error) { diff --git a/internal/llm/openresponses/fakeprovider/provider.go b/internal/llm/openresponses/fakeprovider/provider.go index 39d1214..cc149bd 100644 --- a/internal/llm/openresponses/fakeprovider/provider.go +++ b/internal/llm/openresponses/fakeprovider/provider.go @@ -11,6 +11,8 @@ import ( "net/http" "strings" "time" + + "example.com/llm-chat-web/internal/observability" ) const ( @@ -36,6 +38,10 @@ func NewHandlerWithOptions(opts Options) http.Handler { } func (p provider) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx, span := observability.StartSpan(r.Context(), "fake_provider.responses.create", observability.ComponentFakeProvider) + defer span.End() + r = r.WithContext(ctx) + if r.URL.Path != "/v1/responses" { writeJSONError(w, http.StatusNotFound, "not_found", "not found") return @@ -47,6 +53,7 @@ func (p provider) ServeHTTP(w http.ResponseWriter, r *http.Request) { req, err := decodeRequest(r) if err != nil { + observability.RecordSpanError(span, err) writeJSONError(w, http.StatusBadRequest, "invalid_json", "invalid JSON") return } @@ -57,7 +64,10 @@ func (p provider) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if err := writeStreamingResponseWithOptions(w, r, resp, p.opts); err != nil && !errors.Is(err, r.Context().Err()) { + if err := writeStreamingResponseWithOptions(w, r, resp, p.opts); err != nil { + if !errors.Is(err, r.Context().Err()) { + observability.RecordSpanError(span, err) + } return } } @@ -264,6 +274,10 @@ func writeStreamingResponse(w http.ResponseWriter, r *http.Request, resp respons } func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, resp responseObject, opts Options) error { + ctx, span := observability.StartSpan(r.Context(), "fake_provider.stream.write", observability.ComponentFakeProvider) + defer span.End() + r = r.WithContext(ctx) + flusher, ok := w.(http.Flusher) if !ok { writeJSONError(w, http.StatusInternalServerError, "streaming_unsupported", "streaming unsupported") @@ -279,14 +293,17 @@ func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, r for i, event := range buildStreamEvents(resp) { if err := r.Context().Err(); err != nil { + observability.RecordSpanError(span, err) return err } if i > 0 { if err := waitStreamDelay(r.Context(), opts.StreamDelay); err != nil { + observability.RecordSpanError(span, err) return err } } if err := writeSSEEvent(w, flusher, event.Type, event.Data); err != nil { + observability.RecordSpanError(span, err) _ = writeSSEEvent(w, flusher, "error", map[string]any{ "type": "error", "sequence_number": nextSequence(event.Data), @@ -296,11 +313,18 @@ func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, r _ = writeSSEDone(w, flusher) return err } + observability.LLMStreamEvent(ctx, observability.ComponentFakeProvider, event.Type) } if err := r.Context().Err(); err != nil { + observability.RecordSpanError(span, err) return err } - return writeSSEDone(w, flusher) + if err := writeSSEDone(w, flusher); err != nil { + observability.RecordSpanError(span, err) + return err + } + observability.LLMStreamEvent(ctx, observability.ComponentFakeProvider, "done") + return nil } func waitStreamDelay(ctx context.Context, delay time.Duration) error { diff --git a/internal/observability/observability.go b/internal/observability/observability.go new file mode 100644 index 0000000..6b5d628 --- /dev/null +++ b/internal/observability/observability.go @@ -0,0 +1,618 @@ +package observability + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "os" + "reflect" + "strings" + "sync" + "time" + + "example.com/llm-chat-web/internal/buildinfo" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/metric" + metricnoop "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + tracenoop "go.opentelemetry.io/otel/trace/noop" +) + +const ( + DefaultServiceName = "pyttechat" + + ComponentCLI = "cli" + ComponentWeb = "web" + ComponentLLMProxy = "llm_proxy" + ComponentFakeProvider = "fake_provider" + + ChatTurnStatusCompleted = "completed" + ChatTurnStatusCancelled = "cancelled" + ChatTurnStatusFailed = "failed" + + instrumentationName = "example.com/llm-chat-web/internal/observability" +) + +type Config struct { + ServiceName string + ServiceVersion string + Environment string + OTLPEndpoint string + TracesEndpoint string + MetricsEndpoint string + Enabled bool + TracesEnabled bool + MetricsEnabled bool +} + +type ShutdownFunc func(context.Context) error + +var ( + runtimeMu sync.RWMutex + runtimeEnabled bool + + instrumentMu sync.Mutex + instruments metricInstruments +) + +type metricInstruments struct { + initialized bool + turnsStarted metric.Int64Counter + turnsCompleted metric.Int64Counter + turnsCancelled metric.Int64Counter + turnsFailed metric.Int64Counter + streamEvents metric.Int64Counter + llmDurationMS metric.Float64Histogram + turnDurationMS metric.Float64Histogram +} + +type contextKey string + +const componentContextKey contextKey = "component" + +func FromEnv(info buildinfo.Info) Config { + cfg := Config{ + ServiceName: envString("OTEL_SERVICE_NAME", DefaultServiceName), + ServiceVersion: strings.TrimSpace(info.Version), + OTLPEndpoint: strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")), + TracesEndpoint: strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")), + MetricsEndpoint: strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")), + } + if sdkDisabled() { + return cfg + } + + cfg.TracesEnabled = cfg.OTLPEndpoint != "" || cfg.TracesEndpoint != "" + cfg.MetricsEnabled = cfg.OTLPEndpoint != "" || cfg.MetricsEndpoint != "" + cfg.Enabled = cfg.TracesEnabled || cfg.MetricsEnabled + return cfg +} + +func Init(ctx context.Context, cfg Config) (ShutdownFunc, error) { + cfg = normalizeConfig(cfg) + previous := captureGlobals() + + if !cfg.Enabled || sdkDisabled() { + setEnabled(false) + return onceShutdown(func(context.Context) error { return nil }), nil + } + if err := validateConfig(cfg); err != nil { + setEnabled(false) + return nil, err + } + + res, err := resourceForConfig(ctx, cfg) + if err != nil { + setEnabled(false) + return nil, err + } + + var tracerProvider *sdktrace.TracerProvider + var meterProvider *sdkmetric.MeterProvider + if cfg.TracesEnabled { + exporter, err := otlptracegrpc.New(ctx, traceExporterOptions(cfg)...) + if err != nil { + setEnabled(false) + return nil, fmt.Errorf("initialize OTLP trace exporter: %w", err) + } + tracerProvider = sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tracerProvider) + } else { + otel.SetTracerProvider(tracenoop.NewTracerProvider()) + } + + if cfg.MetricsEnabled { + exporter, err := otlpmetricgrpc.New(ctx, metricExporterOptions(cfg)...) + if err != nil { + if tracerProvider != nil { + _ = tracerProvider.Shutdown(ctx) + } + previous.restore() + setEnabled(false) + return nil, fmt.Errorf("initialize OTLP metric exporter: %w", err) + } + meterProvider = sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)), + ) + otel.SetMeterProvider(meterProvider) + } else { + otel.SetMeterProvider(metricnoop.NewMeterProvider()) + } + + otel.SetTextMapPropagator(propagation.TraceContext{}) + setEnabled(true) + + return onceShutdown(func(ctx context.Context) error { + var err error + if tracerProvider != nil { + err = errors.Join(err, tracerProvider.Shutdown(ctx)) + } + if meterProvider != nil { + err = errors.Join(err, meterProvider.Shutdown(ctx)) + } + previous.restore() + setEnabled(false) + return err + }), nil +} + +func HTTPMiddleware(next http.Handler) http.Handler { + if !isEnabled() { + return next + } + return otelhttp.NewHandler(next, "http.request", + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return RouteName(r.Method, r.URL.Path) + }), + ) +} + +func HTTPClientTransport(base http.RoundTripper) http.RoundTripper { + if !isEnabled() { + return base + } + return otelhttp.NewTransport(base) +} + +func RouteName(method, path string) string { + method = strings.ToUpper(strings.TrimSpace(method)) + if method == "" { + method = "UNKNOWN" + } + path = strings.TrimSpace(path) + if path == "" { + path = "/" + } + + switch { + case strings.HasPrefix(path, "/assets/"): + return method + " /assets/*" + case path == "/favicon.ico": + return method + " /favicon.ico" + case path == "/": + return method + " /" + case path == "/login": + return method + " /login" + case path == "/register": + return method + " /register" + case path == "/logout": + return method + " /logout" + case path == "/chat/turns": + return method + " /chat/turns" + case strings.HasPrefix(path, "/chat/turns/"): + parts := strings.Split(strings.TrimPrefix(path, "/chat/turns/"), "/") + if len(parts) == 2 && parts[0] != "" { + switch parts[1] { + case "events": + return method + " /chat/turns/{turn_id}/events" + case "abort": + return method + " /chat/turns/{turn_id}/abort" + } + } + } + return method + " unmatched" +} + +func ContextWithComponent(ctx context.Context, component string) context.Context { + return context.WithValue(ctx, componentContextKey, sanitizeComponent(component)) +} + +func StartSpan(ctx context.Context, name, component string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + if !isEnabled() { + return tracenoop.NewTracerProvider().Tracer(instrumentationName).Start(ctx, name) + } + component = componentOrContext(ctx, component) + spanAttrs := make([]attribute.KeyValue, 0, len(attrs)+1) + spanAttrs = append(spanAttrs, attribute.String("component", component)) + spanAttrs = append(spanAttrs, attrs...) + return otel.Tracer(instrumentationName).Start(ctx, name, trace.WithAttributes(spanAttrs...)) +} + +func RecordSpanError(span trace.Span, err error) { + if span == nil || err == nil { + return + } + span.RecordError(err, trace.WithAttributes(attribute.String("error.type", errorType(err)))) + span.SetStatus(codes.Error, "error") +} + +func SetSpanStatus(span trace.Span, status string) { + if span == nil || status == "" { + return + } + span.SetAttributes(attribute.String("chat.turn.status", sanitizeStatus(status))) +} + +func ChatTurnStarted(ctx context.Context) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + if metrics.turnsStarted != nil { + metrics.turnsStarted.Add(ctx, 1, metric.WithAttributes(componentAttribute(ctx))) + } +} + +func ChatTurnCompleted(ctx context.Context, startedAt time.Time) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + attrs := metric.WithAttributes(componentAttribute(ctx), attribute.String("status", ChatTurnStatusCompleted)) + if metrics.turnsCompleted != nil { + metrics.turnsCompleted.Add(ctx, 1, attrs) + } + recordTurnDuration(ctx, metrics, startedAt, ChatTurnStatusCompleted) +} + +func ChatTurnCancelled(ctx context.Context, startedAt time.Time) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + attrs := metric.WithAttributes(componentAttribute(ctx), attribute.String("status", ChatTurnStatusCancelled)) + if metrics.turnsCancelled != nil { + metrics.turnsCancelled.Add(ctx, 1, attrs) + } + recordTurnDuration(ctx, metrics, startedAt, ChatTurnStatusCancelled) +} + +func ChatTurnFailed(ctx context.Context, startedAt time.Time, err error) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + attrs := []attribute.KeyValue{ + componentAttribute(ctx), + attribute.String("status", ChatTurnStatusFailed), + } + if err != nil { + attrs = append(attrs, attribute.String("error.type", errorType(err))) + } + if metrics.turnsFailed != nil { + metrics.turnsFailed.Add(ctx, 1, metric.WithAttributes(attrs...)) + } + recordTurnDuration(ctx, metrics, startedAt, ChatTurnStatusFailed) +} + +func LLMStreamEvent(ctx context.Context, component, eventType string) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + if metrics.streamEvents == nil { + return + } + metrics.streamEvents.Add(ctx, 1, metric.WithAttributes( + attribute.String("component", sanitizeComponent(componentOrContext(ctx, component))), + attribute.String("event_type", sanitizeEventType(eventType)), + )) +} + +func RecordLLMRequestDuration(ctx context.Context, component string, startedAt time.Time, status string) { + if !isEnabled() { + return + } + metrics := ensureInstruments() + if metrics.llmDurationMS == nil || startedAt.IsZero() { + return + } + metrics.llmDurationMS.Record(ctx, durationMilliseconds(startedAt), metric.WithAttributes( + attribute.String("component", sanitizeComponent(componentOrContext(ctx, component))), + attribute.String("status", sanitizeStatus(status)), + )) +} + +func EndSpan(span trace.Span, err error) { + if err != nil { + RecordSpanError(span, err) + } + span.End() +} + +func isEnabled() bool { + runtimeMu.RLock() + defer runtimeMu.RUnlock() + return runtimeEnabled +} + +func setEnabled(enabled bool) { + runtimeMu.Lock() + defer runtimeMu.Unlock() + runtimeEnabled = enabled +} + +func normalizeConfig(cfg Config) Config { + cfg.ServiceName = strings.TrimSpace(cfg.ServiceName) + if cfg.ServiceName == "" { + cfg.ServiceName = DefaultServiceName + } + cfg.ServiceVersion = strings.TrimSpace(cfg.ServiceVersion) + cfg.Environment = strings.TrimSpace(cfg.Environment) + cfg.OTLPEndpoint = strings.TrimSpace(cfg.OTLPEndpoint) + cfg.TracesEndpoint = strings.TrimSpace(cfg.TracesEndpoint) + cfg.MetricsEndpoint = strings.TrimSpace(cfg.MetricsEndpoint) + if cfg.Enabled { + if !cfg.TracesEnabled && !cfg.MetricsEnabled { + cfg.TracesEnabled = cfg.OTLPEndpoint != "" || cfg.TracesEndpoint != "" + cfg.MetricsEnabled = cfg.OTLPEndpoint != "" || cfg.MetricsEndpoint != "" + } + cfg.Enabled = cfg.TracesEnabled || cfg.MetricsEnabled + } + return cfg +} + +func validateConfig(cfg Config) error { + for name, value := range map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": cfg.OTLPEndpoint, + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": cfg.TracesEndpoint, + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": cfg.MetricsEndpoint, + } { + if err := validateEndpoint(name, value); err != nil { + return err + } + } + return nil +} + +func validateEndpoint(name, value string) error { + if value == "" { + return nil + } + if strings.ContainsAny(value, " \t\r\n") { + return fmt.Errorf("%s contains whitespace", name) + } + if !strings.Contains(value, "://") { + return nil + } + parsed, err := url.Parse(value) + if err != nil || parsed.Scheme == "" || parsed.Host == "" { + return fmt.Errorf("%s is not a valid OTLP endpoint URL", name) + } + return nil +} + +func resourceForConfig(ctx context.Context, cfg Config) (*resource.Resource, error) { + attrs := []attribute.KeyValue{ + attribute.String("service.name", cfg.ServiceName), + } + if cfg.ServiceVersion != "" { + attrs = append(attrs, attribute.String("service.version", cfg.ServiceVersion)) + } + if cfg.Environment != "" { + attrs = append(attrs, attribute.String("deployment.environment", cfg.Environment)) + } + res, err := resource.New(ctx, + resource.WithFromEnv(), + resource.WithTelemetrySDK(), + resource.WithAttributes(attrs...), + ) + if err != nil { + return nil, fmt.Errorf("build telemetry resource: %w", err) + } + return res, nil +} + +func traceExporterOptions(cfg Config) []otlptracegrpc.Option { + endpoint := cfg.TracesEndpoint + if endpoint == "" { + endpoint = cfg.OTLPEndpoint + } + if endpoint == "" { + return nil + } + if strings.Contains(endpoint, "://") { + return []otlptracegrpc.Option{otlptracegrpc.WithEndpointURL(endpoint)} + } + return []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(endpoint)} +} + +func metricExporterOptions(cfg Config) []otlpmetricgrpc.Option { + endpoint := cfg.MetricsEndpoint + if endpoint == "" { + endpoint = cfg.OTLPEndpoint + } + if endpoint == "" { + return nil + } + if strings.Contains(endpoint, "://") { + return []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpointURL(endpoint)} + } + return []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(endpoint)} +} + +func ensureInstruments() metricInstruments { + instrumentMu.Lock() + defer instrumentMu.Unlock() + if instruments.initialized { + return instruments + } + + meter := otel.Meter(instrumentationName) + instruments.turnsStarted, _ = meter.Int64Counter("pyttechat_chat_turns_started_total") + instruments.turnsCompleted, _ = meter.Int64Counter("pyttechat_chat_turns_completed_total") + instruments.turnsCancelled, _ = meter.Int64Counter("pyttechat_chat_turns_cancelled_total") + instruments.turnsFailed, _ = meter.Int64Counter("pyttechat_chat_turns_failed_total") + instruments.streamEvents, _ = meter.Int64Counter("pyttechat_llm_stream_events_total") + instruments.llmDurationMS, _ = meter.Float64Histogram("pyttechat_llm_request_duration_ms", metric.WithUnit("ms")) + instruments.turnDurationMS, _ = meter.Float64Histogram("pyttechat_chat_turn_duration_ms", metric.WithUnit("ms")) + instruments.initialized = true + return instruments +} + +func recordTurnDuration(ctx context.Context, metrics metricInstruments, startedAt time.Time, status string) { + if metrics.turnDurationMS == nil || startedAt.IsZero() { + return + } + metrics.turnDurationMS.Record(ctx, durationMilliseconds(startedAt), metric.WithAttributes( + componentAttribute(ctx), + attribute.String("status", sanitizeStatus(status)), + )) +} + +func durationMilliseconds(startedAt time.Time) float64 { + return float64(time.Since(startedAt).Nanoseconds()) / float64(time.Millisecond) +} + +func componentAttribute(ctx context.Context) attribute.KeyValue { + return attribute.String("component", componentOrContext(ctx, "")) +} + +func componentOrContext(ctx context.Context, component string) string { + if component == "" { + if value, ok := ctx.Value(componentContextKey).(string); ok { + component = value + } + } + return sanitizeComponent(component) +} + +func sanitizeComponent(component string) string { + switch strings.TrimSpace(component) { + case ComponentCLI: + return ComponentCLI + case ComponentWeb: + return ComponentWeb + case ComponentLLMProxy: + return ComponentLLMProxy + case ComponentFakeProvider: + return ComponentFakeProvider + default: + return "unknown" + } +} + +func sanitizeStatus(status string) string { + switch strings.TrimSpace(status) { + case ChatTurnStatusCompleted: + return ChatTurnStatusCompleted + case ChatTurnStatusCancelled: + return ChatTurnStatusCancelled + case ChatTurnStatusFailed: + return ChatTurnStatusFailed + default: + return "unknown" + } +} + +func sanitizeEventType(eventType string) string { + eventType = strings.TrimSpace(eventType) + if eventType == "" { + return "unknown" + } + for _, r := range eventType { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' || r == '.' || r == '-' { + continue + } + return "unknown" + } + if len(eventType) > 80 { + return "unknown" + } + return eventType +} + +func errorType(err error) string { + switch { + case err == nil: + return "" + case errors.Is(err, context.Canceled): + return "context_canceled" + case errors.Is(err, context.DeadlineExceeded): + return "context_deadline_exceeded" + } + errType := reflect.TypeOf(err) + if errType == nil { + return "unknown" + } + return strings.TrimPrefix(errType.String(), "*") +} + +func envString(name, fallback string) string { + if value := strings.TrimSpace(os.Getenv(name)); value != "" { + return value + } + return fallback +} + +func sdkDisabled() bool { + switch strings.ToLower(strings.TrimSpace(os.Getenv("OTEL_SDK_DISABLED"))) { + case "1", "true", "t", "yes", "y", "on": + return true + default: + return false + } +} + +type globalSnapshot struct { + tracerProvider trace.TracerProvider + meterProvider metric.MeterProvider + propagator propagation.TextMapPropagator + enabled bool +} + +func captureGlobals() globalSnapshot { + runtimeMu.RLock() + enabled := runtimeEnabled + runtimeMu.RUnlock() + return globalSnapshot{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + enabled: enabled, + } +} + +func (s globalSnapshot) restore() { + otel.SetTracerProvider(s.tracerProvider) + otel.SetMeterProvider(s.meterProvider) + otel.SetTextMapPropagator(s.propagator) + setEnabled(s.enabled) +} + +func onceShutdown(shutdown ShutdownFunc) ShutdownFunc { + var once sync.Once + var err error + return func(ctx context.Context) error { + once.Do(func() { + err = shutdown(ctx) + }) + return err + } +} diff --git a/internal/observability/observability_test.go b/internal/observability/observability_test.go new file mode 100644 index 0000000..776f71e --- /dev/null +++ b/internal/observability/observability_test.go @@ -0,0 +1,410 @@ +package observability + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "example.com/llm-chat-web/internal/buildinfo" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +func TestFromEnvDefaultsAndEndpointEnablement(t *testing.T) { + clearTelemetryEnv(t) + + cfg := FromEnv(buildinfo.Info{Version: "1.2.3"}) + if cfg.Enabled { + t.Fatalf("Enabled = true, want false without OTLP endpoints") + } + if cfg.ServiceName != "pyttechat" { + t.Fatalf("ServiceName = %q, want pyttechat", cfg.ServiceName) + } + if cfg.ServiceVersion != "1.2.3" { + t.Fatalf("ServiceVersion = %q, want build version", cfg.ServiceVersion) + } + + t.Setenv("OTEL_SERVICE_NAME", "custom-service") + t.Setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "http://collector.example:4317") + cfg = FromEnv(buildinfo.Info{Version: "dev"}) + if !cfg.Enabled || !cfg.TracesEnabled || cfg.MetricsEnabled { + t.Fatalf("cfg = %#v, want traces enabled only", cfg) + } + if cfg.ServiceName != "custom-service" { + t.Fatalf("ServiceName = %q, want custom-service", cfg.ServiceName) + } + + t.Setenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "http://collector.example:4317") + cfg = FromEnv(buildinfo.Info{Version: "dev"}) + if !cfg.Enabled || !cfg.TracesEnabled || !cfg.MetricsEnabled { + t.Fatalf("cfg = %#v, want traces and metrics enabled", cfg) + } + + t.Setenv("OTEL_SDK_DISABLED", "true") + cfg = FromEnv(buildinfo.Info{Version: "dev"}) + if cfg.Enabled || cfg.TracesEnabled || cfg.MetricsEnabled { + t.Fatalf("cfg = %#v, want SDK disabled to force telemetry off", cfg) + } +} + +func TestInitDisabledDefaultIsNoopAndShutdownSafe(t *testing.T) { + clearTelemetryEnv(t) + restore := captureGlobals().restore + t.Cleanup(restore) + + shutdown, err := Init(context.Background(), FromEnv(buildinfo.Info{})) + if err != nil { + t.Fatalf("Init disabled error = %v, want nil", err) + } + if isEnabled() { + t.Fatalf("telemetry runtime is enabled, want disabled") + } + if err := shutdown(context.Background()); err != nil { + t.Fatalf("first shutdown error = %v, want nil", err) + } + if err := shutdown(context.Background()); err != nil { + t.Fatalf("second shutdown error = %v, want nil", err) + } +} + +func TestInitEnabledSetsProvidersAndRestoresGlobals(t *testing.T) { + clearTelemetryEnv(t) + restore := captureGlobals().restore + t.Cleanup(restore) + + cfg := Config{ + ServiceName: "test-service", + ServiceVersion: "test-version", + Environment: "test", + OTLPEndpoint: "http://127.0.0.1:4317", + Enabled: true, + TracesEnabled: true, + } + shutdown, err := Init(context.Background(), cfg) + if err != nil { + t.Fatalf("Init enabled error = %v, want nil", err) + } + if !isEnabled() { + t.Fatalf("telemetry runtime is disabled, want enabled") + } + if otel.GetTextMapPropagator().Fields()[0] != "traceparent" { + t.Fatalf("propagator fields = %#v, want tracecontext propagator", otel.GetTextMapPropagator().Fields()) + } + if err := shutdown(context.Background()); err != nil { + t.Fatalf("shutdown error = %v, want nil", err) + } + if isEnabled() { + t.Fatalf("telemetry runtime is enabled after shutdown, want restored disabled state") + } +} + +func TestInitEnabledRejectsInvalidEndpoint(t *testing.T) { + clearTelemetryEnv(t) + restore := captureGlobals().restore + t.Cleanup(restore) + + _, err := Init(context.Background(), Config{ + OTLPEndpoint: "http://", + Enabled: true, + TracesEnabled: true, + MetricsEnabled: true, + }) + if err == nil || !strings.Contains(err.Error(), "OTLP endpoint") { + t.Fatalf("Init invalid endpoint error = %v, want OTLP endpoint validation error", err) + } + if isEnabled() { + t.Fatalf("telemetry runtime is enabled after failed init, want disabled") + } +} + +func TestHTTPMiddlewarePreservesBehaviorAndNormalizesRouteName(t *testing.T) { + recorder := installSpanRecorder(t) + setEnabledForTest(t, true) + + handler := HTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("X-Test", "ok") + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("body")) + })) + + request := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/chat/turns/turn_secret/events", nil) + response := httptest.NewRecorder() + + handler.ServeHTTP(response, request) + + if response.Code != http.StatusAccepted || response.Header().Get("X-Test") != "ok" || response.Body.String() != "body" { + t.Fatalf("middleware response = status %d header %q body %q, want handler behavior preserved", response.Code, response.Header().Get("X-Test"), response.Body.String()) + } + ended := recorder.Ended() + if len(ended) != 1 { + t.Fatalf("ended span count = %d, want 1", len(ended)) + } + if got := ended[0].Name(); got != "GET /chat/turns/{turn_id}/events" { + t.Fatalf("span name = %q, want normalized turn events route", got) + } +} + +func TestHTTPMiddlewareDisabledReturnsOriginalHandler(t *testing.T) { + setEnabledForTest(t, false) + + handler := &comparableHandler{} + if got := HTTPMiddleware(handler); got != handler { + t.Fatalf("disabled middleware returned %T, want original handler", got) + } +} + +func TestHTTPClientTransportDisabledReturnsBaseAndEnabledPropagatesTraceContext(t *testing.T) { + restore := captureGlobals().restore + t.Cleanup(restore) + + base := http.DefaultTransport + setEnabledForTest(t, false) + if got := HTTPClientTransport(base); got != base { + t.Fatalf("disabled transport = %T, want original base transport", got) + } + + installSpanRecorder(t) + setEnabledForTest(t, true) + + var traceparent string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + traceparent = r.Header.Get("traceparent") + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client := &http.Client{Transport: HTTPClientTransport(http.DefaultTransport)} + ctx, span := otel.Tracer(instrumentationName).Start(context.Background(), "parent") + request, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil) + if err != nil { + t.Fatalf("NewRequest error = %v", err) + } + response, err := client.Do(request) + if err != nil { + t.Fatalf("client.Do error = %v", err) + } + response.Body.Close() + span.End() + + if traceparent == "" { + t.Fatalf("traceparent header is empty, want propagated trace context") + } +} + +func TestRouteNameNormalizesKnownRoutes(t *testing.T) { + for _, tc := range []struct { + method string + path string + want string + }{ + {method: "get", path: "", want: "GET /"}, + {method: "GET", path: "/assets/app.js", want: "GET /assets/*"}, + {method: "GET", path: "/favicon.ico", want: "GET /favicon.ico"}, + {method: "POST", path: "/login", want: "POST /login"}, + {method: "GET", path: "/register", want: "GET /register"}, + {method: "POST", path: "/logout", want: "POST /logout"}, + {method: "POST", path: "/chat/turns", want: "POST /chat/turns"}, + {method: "GET", path: "/chat/turns/secret/events", want: "GET /chat/turns/{turn_id}/events"}, + {method: "POST", path: "/chat/turns/secret/abort", want: "POST /chat/turns/{turn_id}/abort"}, + {method: "GET", path: "/chat/turns/secret/unknown", want: "GET unmatched"}, + {method: "GET", path: "/users/secret", want: "GET unmatched"}, + } { + t.Run(tc.want, func(t *testing.T) { + if got := RouteName(tc.method, tc.path); got != tc.want { + t.Fatalf("RouteName(%q, %q) = %q, want %q", tc.method, tc.path, got, tc.want) + } + }) + } +} + +func TestManualSpansAndMetricsHelpersAreSafeWithNoopProviders(t *testing.T) { + recorder := installSpanRecorder(t) + setEnabledForTest(t, true) + resetInstruments(t) + + ctx := ContextWithComponent(context.Background(), ComponentWeb) + ctx, span := StartSpan(ctx, "test.span", "", attribute.String("custom", "bounded")) + RecordSpanError(span, context.Canceled) + SetSpanStatus(span, ChatTurnStatusCompleted) + + startedAt := time.Now().Add(-2 * time.Millisecond) + ChatTurnStarted(ctx) + ChatTurnCompleted(ctx, startedAt) + ChatTurnCancelled(ctx, startedAt) + ChatTurnFailed(ctx, startedAt, context.DeadlineExceeded) + LLMStreamEvent(ctx, ComponentWeb, "response.completed") + RecordLLMRequestDuration(ctx, ComponentLLMProxy, startedAt, ChatTurnStatusCompleted) + EndSpan(span, nil) + + ended := recorder.Ended() + if len(ended) != 1 { + t.Fatalf("ended span count = %d, want 1", len(ended)) + } + if ended[0].Name() != "test.span" { + t.Fatalf("span name = %q, want test.span", ended[0].Name()) + } + + // Repeated calls reuse already-created instruments. + ChatTurnStarted(ctx) + LLMStreamEvent(ctx, ComponentWeb, "response.completed") +} + +func TestSanitizersAndExporterOptions(t *testing.T) { + if got := sanitizeComponent("bad"); got != "unknown" { + t.Fatalf("sanitizeComponent = %q, want unknown", got) + } + if got := sanitizeStatus("bad"); got != "unknown" { + t.Fatalf("sanitizeStatus = %q, want unknown", got) + } + if got := sanitizeEventType("response.completed"); got != "response.completed" { + t.Fatalf("sanitizeEventType valid = %q, want response.completed", got) + } + if got := sanitizeEventType("bad event"); got != "unknown" { + t.Fatalf("sanitizeEventType invalid = %q, want unknown", got) + } + if got := sanitizeEventType(strings.Repeat("a", 81)); got != "unknown" { + t.Fatalf("sanitizeEventType long = %q, want unknown", got) + } + if got := errorType(context.Canceled); got != "context_canceled" { + t.Fatalf("errorType canceled = %q, want context_canceled", got) + } + if got := errorType(context.DeadlineExceeded); got != "context_deadline_exceeded" { + t.Fatalf("errorType deadline = %q, want context_deadline_exceeded", got) + } + if got := errorType(errors.New("plain")); got != "errors.errorString" { + t.Fatalf("errorType plain = %q, want errors.errorString", got) + } + + cfg := normalizeConfig(Config{Enabled: true, OTLPEndpoint: "collector:4317"}) + if !cfg.Enabled || !cfg.TracesEnabled || !cfg.MetricsEnabled { + t.Fatalf("normalizeConfig shared endpoint = %#v, want both signals enabled", cfg) + } + cfg = normalizeConfig(Config{Enabled: true}) + if cfg.Enabled { + t.Fatalf("normalizeConfig no endpoints = %#v, want disabled", cfg) + } + + if err := validateEndpoint("test", "bad endpoint"); err == nil { + t.Fatalf("validateEndpoint whitespace error = nil, want error") + } + if err := validateEndpoint("test", "http://"); err == nil { + t.Fatalf("validateEndpoint bad URL error = nil, want error") + } + if err := validateEndpoint("test", "collector:4317"); err != nil { + t.Fatalf("validateEndpoint host:port error = %v, want nil", err) + } + if err := validateConfig(Config{OTLPEndpoint: "collector:4317"}); err != nil { + t.Fatalf("validateConfig error = %v, want nil", err) + } + + if len(traceExporterOptions(Config{OTLPEndpoint: "collector:4317"})) != 1 { + t.Fatalf("traceExporterOptions host:port length != 1") + } + if len(traceExporterOptions(Config{TracesEndpoint: "http://collector:4317"})) != 1 { + t.Fatalf("traceExporterOptions URL length != 1") + } + if len(metricExporterOptions(Config{OTLPEndpoint: "collector:4317"})) != 1 { + t.Fatalf("metricExporterOptions host:port length != 1") + } + if len(metricExporterOptions(Config{MetricsEndpoint: "http://collector:4317"})) != 1 { + t.Fatalf("metricExporterOptions URL length != 1") + } + if len(traceExporterOptions(Config{})) != 0 || len(metricExporterOptions(Config{})) != 0 { + t.Fatalf("empty exporter options should be empty") + } +} + +func TestResourceForConfigIncludesConfiguredAttributes(t *testing.T) { + t.Setenv("OTEL_RESOURCE_ATTRIBUTES", "deployment.environment=local,service.namespace=test") + res, err := resourceForConfig(context.Background(), Config{ + ServiceName: "svc", + ServiceVersion: "v1", + Environment: "override", + }) + if err != nil { + t.Fatalf("resourceForConfig error = %v, want nil", err) + } + attrs := res.Set() + for _, want := range []attribute.Key{ + "service.name", + "service.version", + "deployment.environment", + "service.namespace", + } { + if _, ok := attrs.Value(want); !ok { + t.Fatalf("resource missing attribute %q", want) + } + } +} + +func installSpanRecorder(t *testing.T) *tracetest.SpanRecorder { + t.Helper() + + restore := captureGlobals().restore + t.Cleanup(restore) + + recorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder)) + otel.SetTracerProvider(provider) + otel.SetTextMapPropagator(propagation.TraceContext{}) + return recorder +} + +func clearTelemetryEnv(t *testing.T) { + t.Helper() + + for _, name := range []string{ + "OTEL_SERVICE_NAME", + "OTEL_EXPORTER_OTLP_ENDPOINT", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "OTEL_RESOURCE_ATTRIBUTES", + "OTEL_SDK_DISABLED", + } { + t.Setenv(name, "") + } +} + +func setEnabledForTest(t *testing.T, enabled bool) { + t.Helper() + + runtimeMu.Lock() + previous := runtimeEnabled + runtimeEnabled = enabled + runtimeMu.Unlock() + + t.Cleanup(func() { + runtimeMu.Lock() + runtimeEnabled = previous + runtimeMu.Unlock() + }) +} + +func resetInstruments(t *testing.T) { + t.Helper() + + instrumentMu.Lock() + previous := instruments + instruments = metricInstruments{} + instrumentMu.Unlock() + + t.Cleanup(func() { + instrumentMu.Lock() + instruments = previous + instrumentMu.Unlock() + }) +} + +type comparableHandler struct{} + +func (h *comparableHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/web/server.go b/internal/web/server.go index 94c6532..9f350c6 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -23,6 +23,7 @@ import ( "example.com/llm-chat-web/internal/chat" "example.com/llm-chat-web/internal/llm" "example.com/llm-chat-web/internal/markdown" + "example.com/llm-chat-web/internal/observability" "example.com/llm-chat-web/internal/storage" ) @@ -182,7 +183,8 @@ func (s *Server) handleCreateTurn(w http.ResponseWriter, r *http.Request) { return } - turn, err := newTurnJob(prompt) + turnCtx := observability.ContextWithComponent(context.WithoutCancel(r.Context()), observability.ComponentWeb) + turn, err := newTurnJobWithContext(turnCtx, prompt) if err != nil { writeJSONError(w, http.StatusInternalServerError, "turn_error", "could not create turn") return @@ -232,6 +234,10 @@ func (s *Server) handleTurnRoute(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID string) { + ctx, span := observability.StartSpan(r.Context(), "chat.turn.stream", observability.ComponentWeb) + defer span.End() + r = r.WithContext(ctx) + session, err := s.session(w, r) if err != nil { if errors.Is(err, auth.ErrInvalidSession) { @@ -269,8 +275,10 @@ func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID for _, event := range replay { if err := writeSSE(w, flusher, event); err != nil { + observability.RecordSpanError(span, err) return } + observability.LLMStreamEvent(ctx, observability.ComponentWeb, event.Name) } if terminal { return @@ -285,13 +293,19 @@ func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID return } if err := writeSSE(w, flusher, event); err != nil { + observability.RecordSpanError(span, err) return } + observability.LLMStreamEvent(ctx, observability.ComponentWeb, event.Name) } } } func (s *Server) handleAbortTurn(w http.ResponseWriter, r *http.Request, turnID string) { + ctx, span := observability.StartSpan(r.Context(), "chat.turn.abort", observability.ComponentWeb) + defer span.End() + r = r.WithContext(ctx) + session, err := s.session(w, r) if err != nil { if errors.Is(err, auth.ErrInvalidSession) { @@ -315,6 +329,7 @@ func (s *Server) handleAbortTurn(w http.ResponseWriter, r *http.Request, turnID writeJSONError(w, http.StatusConflict, "turn_finished", "turn is already finished") return } + observability.SetSpanStatus(span, observability.ChatTurnStatusCancelled) writeJSON(w, http.StatusOK, map[string]any{ "aborted": turnID, }) @@ -1024,6 +1039,10 @@ type turnJob struct { } func newTurnJob(prompt string) (*turnJob, error) { + return newTurnJobWithContext(context.Background(), prompt) +} + +func newTurnJobWithContext(ctx context.Context, prompt string) (*turnJob, error) { turnID, err := randomID("turn") if err != nil { return nil, err @@ -1036,7 +1055,11 @@ func newTurnJob(prompt string) (*turnJob, error) { if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.Background()) + if ctx == nil { + ctx = context.Background() + } + ctx = observability.ContextWithComponent(ctx, observability.ComponentWeb) + ctx, cancel := context.WithCancel(ctx) return &turnJob{ id: turnID, userMessageID: userMessageID, From 873668c498d879914ed14722322d0b90ba6bed03 Mon Sep 17 00:00:00 2001 From: Johan Carlin Date: Mon, 8 Jun 2026 21:18:16 +0200 Subject: [PATCH 2/3] Fix telemetry context lint --- internal/chat/service.go | 3 ++- internal/cli/root.go | 16 +++++++----- internal/cli/root_test.go | 10 +++---- .../openresponses/fakeprovider/provider.go | 26 ++++++++++++------- internal/web/server.go | 19 +++++++------- 5 files changed, 42 insertions(+), 32 deletions(-) diff --git a/internal/chat/service.go b/internal/chat/service.go index 8547695..62c182a 100644 --- a/internal/chat/service.go +++ b/internal/chat/service.go @@ -67,6 +67,7 @@ type SendOptions struct { Model string ReasoningEffort string RenderingInstructions string + TelemetryComponent string } func (s *Session) Send(ctx context.Context, prompt string, opts SendOptions) (*TurnStream, error) { @@ -75,7 +76,7 @@ func (s *Session) Send(ctx context.Context, prompt string, opts SendOptions) (*T return nil, ErrEmptyPrompt } - ctx, span := observability.StartSpan(ctx, "chat.turn.start", "") + ctx, span := observability.StartSpan(ctx, "chat.turn.start", opts.TelemetryComponent) startedAt := time.Now() defer span.End() diff --git a/internal/cli/root.go b/internal/cli/root.go index adf11eb..b5c847a 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -204,6 +204,7 @@ func newAskCommand(stdout, stderr io.Writer, opts *rootOptions) *cobra.Command { Model: opts.model, ReasoningEffort: opts.reasoningEffort, RenderingInstructions: chat.WebRenderingInstructions(), + TelemetryComponent: observability.ComponentCLI, }) if err != nil { if errors.Is(err, chat.ErrEmptyPrompt) { @@ -212,7 +213,7 @@ func newAskCommand(stdout, stderr io.Writer, opts *rootOptions) *cobra.Command { return err } - return printStream(cmd.Context(), stream, stdout, stderr) + return printStream(stream, stdout, stderr) }, } } @@ -234,6 +235,7 @@ func newChatCommand(stdin io.Reader, stdout, stderr io.Writer, opts *rootOptions Model: opts.model, ReasoningEffort: opts.reasoningEffort, RenderingInstructions: chat.WebRenderingInstructions(), + TelemetryComponent: observability.ComponentCLI, }) if err != nil { if errors.Is(err, chat.ErrEmptyPrompt) { @@ -241,7 +243,7 @@ func newChatCommand(stdin io.Reader, stdout, stderr io.Writer, opts *rootOptions } return err } - if err := printStream(cmd.Context(), stream, stdout, stderr); err != nil { + if err := printStream(stream, stdout, stderr); err != nil { return err } } @@ -344,9 +346,10 @@ func envBoolDefault(name string, fallback bool) bool { } } -func printStream(ctx context.Context, stream *chat.TurnStream, stdout, stderr io.Writer) error { +func printStream(stream *chat.TurnStream, stdout, stderr io.Writer) error { defer stream.Close() + metricsCtx := context.Background() wroteText := false for { event, err := stream.Next() @@ -359,7 +362,7 @@ func printStream(ctx context.Context, stream *chat.TurnStream, stdout, stderr io switch event.Type { case llm.EventTextDelta: - observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) + observability.LLMStreamEvent(metricsCtx, observability.ComponentCLI, string(event.Type)) if _, err := fmt.Fprint(stdout, event.Delta); err != nil { return err } @@ -367,12 +370,12 @@ func printStream(ctx context.Context, stream *chat.TurnStream, stdout, stderr io wroteText = true } case llm.EventReasoningDelta: - observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) + observability.LLMStreamEvent(metricsCtx, observability.ComponentCLI, string(event.Type)) if _, err := fmt.Fprint(stderr, event.Delta); err != nil { return err } case llm.EventOutputItemDone, llm.EventCompleted, llm.EventError: - observability.LLMStreamEvent(ctx, observability.ComponentCLI, string(event.Type)) + observability.LLMStreamEvent(metricsCtx, observability.ComponentCLI, string(event.Type)) } } if wroteText { @@ -403,7 +406,6 @@ func newVersionCommand(stdout io.Writer) *cobra.Command { } func Execute(ctx context.Context, args []string, stdin io.Reader, stdout, stderr io.Writer) int { - ctx = observability.ContextWithComponent(ctx, observability.ComponentCLI) ctx, span := observability.StartSpan(ctx, "cli.command", observability.ComponentCLI, attribute.String("cli.command.name", cliCommandName(args))) defer span.End() diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go index ff8cb3e..6a293df 100644 --- a/internal/cli/root_test.go +++ b/internal/cli/root_test.go @@ -622,7 +622,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { var stdout bytes.Buffer var stderr bytes.Buffer - if err := printStream(context.Background(), stream, &stdout, &stderr); err != nil { + if err := printStream(stream, &stdout, &stderr); err != nil { t.Fatalf("printStream error = %v, want nil", err) } if stdout.String() != "" { @@ -638,7 +638,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { {Type: llm.EventTextDelta, Delta: "answer"}, }) - if err := printStream(context.Background(), stream, failingWriter{}, io.Discard); err == nil { + if err := printStream(stream, failingWriter{}, io.Discard); err == nil { t.Fatalf("printStream error = nil, want stdout writer error") } }) @@ -648,7 +648,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { {Type: llm.EventReasoningDelta, Delta: "thinking"}, }) - if err := printStream(context.Background(), stream, io.Discard, failingWriter{}); err == nil { + if err := printStream(stream, io.Discard, failingWriter{}); err == nil { t.Fatalf("printStream error = nil, want stderr writer error") } }) @@ -659,7 +659,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { if err != nil { t.Fatalf("Send() error = %v, want nil", err) } - if err := printStream(context.Background(), stream, io.Discard, io.Discard); err == nil { + if err := printStream(stream, io.Discard, io.Discard); err == nil { t.Fatalf("printStream error = nil, want stream error") } }) @@ -671,7 +671,7 @@ func TestPrintStreamHandlesReasoningOnlyAndWriterErrors(t *testing.T) { }) writer := &failAfterWriter{failAt: 1} - if err := printStream(context.Background(), stream, writer, io.Discard); err == nil { + if err := printStream(stream, writer, io.Discard); err == nil { t.Fatalf("printStream error = nil, want newline writer error") } }) diff --git a/internal/llm/openresponses/fakeprovider/provider.go b/internal/llm/openresponses/fakeprovider/provider.go index cc149bd..8c60ee3 100644 --- a/internal/llm/openresponses/fakeprovider/provider.go +++ b/internal/llm/openresponses/fakeprovider/provider.go @@ -13,6 +13,8 @@ import ( "time" "example.com/llm-chat-web/internal/observability" + + "go.opentelemetry.io/otel/trace" ) const ( @@ -40,7 +42,6 @@ func NewHandlerWithOptions(opts Options) http.Handler { func (p provider) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, span := observability.StartSpan(r.Context(), "fake_provider.responses.create", observability.ComponentFakeProvider) defer span.End() - r = r.WithContext(ctx) if r.URL.Path != "/v1/responses" { writeJSONError(w, http.StatusNotFound, "not_found", "not found") @@ -64,7 +65,10 @@ func (p provider) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if err := writeStreamingResponseWithOptions(w, r, resp, p.opts); err != nil { + _, streamSpan := observability.StartSpan(ctx, "fake_provider.stream.write", observability.ComponentFakeProvider) + err = writeStreamingResponseWithOptionsSpan(w, r, resp, p.opts, streamSpan) + streamSpan.End() + if err != nil { if !errors.Is(err, r.Context().Err()) { observability.RecordSpanError(span, err) } @@ -274,10 +278,14 @@ func writeStreamingResponse(w http.ResponseWriter, r *http.Request, resp respons } func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, resp responseObject, opts Options) error { - ctx, span := observability.StartSpan(r.Context(), "fake_provider.stream.write", observability.ComponentFakeProvider) + _, span := observability.StartSpan(r.Context(), "fake_provider.stream.write", observability.ComponentFakeProvider) defer span.End() - r = r.WithContext(ctx) + return writeStreamingResponseWithOptionsSpan(w, r, resp, opts, span) +} + +func writeStreamingResponseWithOptionsSpan(w http.ResponseWriter, r *http.Request, resp responseObject, opts Options, span trace.Span) error { + requestCtx := r.Context() flusher, ok := w.(http.Flusher) if !ok { writeJSONError(w, http.StatusInternalServerError, "streaming_unsupported", "streaming unsupported") @@ -292,12 +300,12 @@ func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, r w.WriteHeader(http.StatusOK) for i, event := range buildStreamEvents(resp) { - if err := r.Context().Err(); err != nil { + if err := requestCtx.Err(); err != nil { observability.RecordSpanError(span, err) return err } if i > 0 { - if err := waitStreamDelay(r.Context(), opts.StreamDelay); err != nil { + if err := waitStreamDelay(requestCtx, opts.StreamDelay); err != nil { observability.RecordSpanError(span, err) return err } @@ -313,9 +321,9 @@ func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, r _ = writeSSEDone(w, flusher) return err } - observability.LLMStreamEvent(ctx, observability.ComponentFakeProvider, event.Type) + observability.LLMStreamEvent(requestCtx, observability.ComponentFakeProvider, event.Type) } - if err := r.Context().Err(); err != nil { + if err := requestCtx.Err(); err != nil { observability.RecordSpanError(span, err) return err } @@ -323,7 +331,7 @@ func writeStreamingResponseWithOptions(w http.ResponseWriter, r *http.Request, r observability.RecordSpanError(span, err) return err } - observability.LLMStreamEvent(ctx, observability.ComponentFakeProvider, "done") + observability.LLMStreamEvent(requestCtx, observability.ComponentFakeProvider, "done") return nil } diff --git a/internal/web/server.go b/internal/web/server.go index 9f350c6..86961f8 100644 --- a/internal/web/server.go +++ b/internal/web/server.go @@ -183,8 +183,7 @@ func (s *Server) handleCreateTurn(w http.ResponseWriter, r *http.Request) { return } - turnCtx := observability.ContextWithComponent(context.WithoutCancel(r.Context()), observability.ComponentWeb) - turn, err := newTurnJobWithContext(turnCtx, prompt) + turn, err := newTurnJobWithContext(context.WithoutCancel(r.Context()), prompt) if err != nil { writeJSONError(w, http.StatusInternalServerError, "turn_error", "could not create turn") return @@ -205,6 +204,7 @@ func (s *Server) handleCreateTurn(w http.ResponseWriter, r *http.Request) { Model: s.model, ReasoningEffort: s.reasoningEffort, RenderingInstructions: chat.WebRenderingInstructions(), + TelemetryComponent: observability.ComponentWeb, }) writeJSON(w, http.StatusCreated, createTurnResponse{ @@ -234,9 +234,9 @@ func (s *Server) handleTurnRoute(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID string) { - ctx, span := observability.StartSpan(r.Context(), "chat.turn.stream", observability.ComponentWeb) + requestCtx := r.Context() + ctx, span := observability.StartSpan(requestCtx, "chat.turn.stream", observability.ComponentWeb) defer span.End() - r = r.WithContext(ctx) session, err := s.session(w, r) if err != nil { @@ -286,7 +286,7 @@ func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID for { select { - case <-r.Context().Done(): + case <-requestCtx.Done(): return case event, ok := <-updates: if !ok { @@ -302,9 +302,9 @@ func (s *Server) handleTurnEvents(w http.ResponseWriter, r *http.Request, turnID } func (s *Server) handleAbortTurn(w http.ResponseWriter, r *http.Request, turnID string) { - ctx, span := observability.StartSpan(r.Context(), "chat.turn.abort", observability.ComponentWeb) + requestCtx := r.Context() + _, span := observability.StartSpan(requestCtx, "chat.turn.abort", observability.ComponentWeb) defer span.End() - r = r.WithContext(ctx) session, err := s.session(w, r) if err != nil { @@ -325,7 +325,7 @@ func (s *Server) handleAbortTurn(w http.ResponseWriter, r *http.Request, turnID writeJSONError(w, http.StatusNotFound, "turn_not_found", "turn not found") return } - if !turn.abort(r.Context()) { + if !turn.abort(requestCtx) { writeJSONError(w, http.StatusConflict, "turn_finished", "turn is already finished") return } @@ -1042,7 +1042,7 @@ func newTurnJob(prompt string) (*turnJob, error) { return newTurnJobWithContext(context.Background(), prompt) } -func newTurnJobWithContext(ctx context.Context, prompt string) (*turnJob, error) { +func newTurnJobWithContext(ctx context.Context, prompt string) (*turnJob, error) { //nolint:contextcheck // turn jobs store a cancelable context because they outlive the create request. turnID, err := randomID("turn") if err != nil { return nil, err @@ -1058,7 +1058,6 @@ func newTurnJobWithContext(ctx context.Context, prompt string) (*turnJob, error) if ctx == nil { ctx = context.Background() } - ctx = observability.ContextWithComponent(ctx, observability.ComponentWeb) ctx, cancel := context.WithCancel(ctx) return &turnJob{ id: turnID, From 0a7169b1916501d4c53b1af07da4d96f0ed94394 Mon Sep 17 00:00:00 2001 From: Johan Carlin Date: Mon, 8 Jun 2026 21:42:50 +0200 Subject: [PATCH 3/3] Fix completed OpenResponses stream telemetry --- internal/llm/openresponses/client.go | 2 + internal/llm/openresponses/client_test.go | 48 +++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/internal/llm/openresponses/client.go b/internal/llm/openresponses/client.go index 08bc276..584a2f7 100644 --- a/internal/llm/openresponses/client.go +++ b/internal/llm/openresponses/client.go @@ -365,6 +365,8 @@ func (s *stream) mapPayload(payload map[string]any) (llm.Event, bool, error) { } return llm.Event{Type: llm.EventOutputItemDone, Part: part}, true, nil case "response.completed": + s.done = true + s.finish(nil) return completedEvent(payload), true, nil case "response.failed", "error": return llm.Event{}, false, streamError(payload) diff --git a/internal/llm/openresponses/client_test.go b/internal/llm/openresponses/client_test.go index 8ca5fa6..e9a6347 100644 --- a/internal/llm/openresponses/client_test.go +++ b/internal/llm/openresponses/client_test.go @@ -14,6 +14,10 @@ import ( "example.com/llm-chat-web/internal/llm" "example.com/llm-chat-web/internal/llm/openresponses/fakeprovider" + + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) func TestNewClientSetsDefaultTimeout(t *testing.T) { @@ -708,6 +712,50 @@ func TestStreamNextReturnsReadErrors(t *testing.T) { } } +func TestStreamCompletedEventFinishesSpanWithoutCancel(t *testing.T) { + exporter := tracetest.NewInMemoryExporter() + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter)) + defer func() { + if err := tracerProvider.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown() error = %v, want nil", err) + } + }() + _, span := tracerProvider.Tracer("test").Start(context.Background(), "openresponses.stream.consume") + payload := `data: {"type":"response.completed","sequence_number":1,"response":{"id":"resp_1"}}` + "\n\n" + s := &stream{ + body: io.NopCloser(strings.NewReader("")), + reader: bufio.NewReader(strings.NewReader(payload)), + span: span, + } + + event, err := s.Next() + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if event.Type != llm.EventCompleted { + t.Fatalf("event type = %q, want completed", event.Type) + } + spans := exporter.GetSpans() + if len(spans) != 1 { + t.Fatalf("exported spans after completed event = %d, want 1", len(spans)) + } + if err := s.Close(); err != nil { + t.Fatalf("Close() error = %v, want nil", err) + } + spans = exporter.GetSpans() + if len(spans) != 1 { + t.Fatalf("exported spans after Close() = %d, want 1", len(spans)) + } + if spans[0].Status.Code == codes.Error { + t.Fatalf("span status = %v, want non-error", spans[0].Status) + } + for _, event := range spans[0].Events { + if event.Name == "exception" { + t.Fatalf("span events = %#v, want no recorded cancellation exception", spans[0].Events) + } + } +} + func TestClientReturnsStreamErrorEventsAsErrors(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/event-stream")