From 746e469c8b0c9ef711010cb51112a258555f8c2f Mon Sep 17 00:00:00 2001 From: Hannes Hapke Date: Fri, 12 Jun 2026 19:26:22 -0700 Subject: [PATCH 1/4] fix: streaming support to work with Claude Code --- src/backend/providers/anthropic.go | 60 +++++-- src/backend/proxy/handler.go | 35 ++++ src/backend/proxy/streaming.go | 260 +++++++++++++++++++++++++++++ src/backend/proxy/transparent.go | 31 +++- 4 files changed, 371 insertions(+), 15 deletions(-) create mode 100644 src/backend/proxy/streaming.go diff --git a/src/backend/providers/anthropic.go b/src/backend/providers/anthropic.go index e5cd7168..ce268cf1 100644 --- a/src/backend/providers/anthropic.go +++ b/src/backend/providers/anthropic.go @@ -55,6 +55,20 @@ func (p *AnthropicProvider) ExtractRequestText(data map[string]interface{}) (str } if content, ok := msgMap["content"].(string); ok { result.WriteString(content + "\n") + } else if blocks, ok := msgMap["content"].([]interface{}); ok { + // Messages API content-block array: collect text from text blocks. + for _, blk := range blocks { + blkMap, ok := blk.(map[string]interface{}) + if !ok { + continue + } + if t, _ := blkMap["type"].(string); t != "text" { + continue + } + if text, ok := blkMap["text"].(string); ok { + result.WriteString(text + "\n") + } + } } } return result.String(), nil @@ -91,24 +105,46 @@ func (p *AnthropicProvider) CreateMaskedRequest(maskedRequest map[string]interfa return maskedToOriginal, &entities, fmt.Errorf("no messages field in request") } + // mask runs PII detection over a single piece of text and merges the + // resulting entities and mappings into the accumulators above. + mask := func(text string) string { + maskedText, _maskedToOriginal, _entities := maskPIIInText(text, "[MaskedRequest]") + entities = append(entities, _entities...) + for k, v := range _maskedToOriginal { + maskedToOriginal[k] = v + } + return maskedText + } + for _, msg := range messages { msgMap, ok := msg.(map[string]interface{}) if !ok { continue } - content, ok := msgMap["content"].(string) - if !ok { - continue - } - - // Mask PII in this message's content and update message content with masked text - maskedText, _maskedToOriginal, _entities := maskPIIInText(content, "[MaskedRequest]") - msgMap["content"] = maskedText - // Collect entities and mappings - entities = append(entities, _entities...) - for k, v := range _maskedToOriginal { - maskedToOriginal[k] = v + // The Messages API allows `content` to be either a plain string or an + // array of typed content blocks (Claude Code always uses the latter). + // Handle both so PII is masked in either shape. + switch content := msgMap["content"].(type) { + case string: + msgMap["content"] = mask(content) + case []interface{}: + for _, blk := range content { + blkMap, ok := blk.(map[string]interface{}) + if !ok { + continue + } + // Only text blocks carry free text; skip image / tool_use / + // tool_result blocks (different/nested shapes). + if t, _ := blkMap["type"].(string); t != "text" { + continue + } + text, ok := blkMap["text"].(string) + if !ok { + continue + } + blkMap["text"] = mask(text) + } } } diff --git a/src/backend/proxy/handler.go b/src/backend/proxy/handler.go index a480a345..dc57ae43 100644 --- a/src/backend/proxy/handler.go +++ b/src/backend/proxy/handler.go @@ -485,6 +485,41 @@ func (h *Handler) ProcessResponseBody(ctx context.Context, body []byte, contentT return modifiedBody } +// LogStreamedResponse records a streamed (SSE) response in the logging DB. The +// usual ProcessResponseBody path is skipped for streams (the body is pumped +// straight to the client), so this mirrors its response-logging half: a +// response_masked row (the text the model actually returned, before PII +// restoration) and a response_original row (the text delivered to the client, +// after restoration), correlated by transactionID. Only assistant text is +// captured; tool_use / tool_result blocks are not included. +func (h *Handler) LogStreamedResponse(ctx context.Context, transactionID, maskedText, restoredText string) { + if h.loggingDB == nil { + return + } + logCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + // Wrap the assistant text in a minimal Anthropic-shaped envelope so the + // transaction ID can be attached and the log UI can parse it consistently. + wrap := func(text string) string { + b, err := json.Marshal(map[string]interface{}{ + "streamed": true, + "content": []map[string]interface{}{{"type": "text", "text": text}}, + }) + if err != nil { + return text + } + return h.addTransactionID(string(b), transactionID) + } + + if err := h.loggingDB.InsertLog(logCtx, wrap(maskedText), "response_masked", []pii.Entity{}, false); err != nil { + log.Printf("[Proxy] ⚠️ Failed to log masked streamed response: %v", err) + } + if err := h.loggingDB.InsertLog(logCtx, wrap(restoredText), "response_original", []pii.Entity{}, false); err != nil { + log.Printf("[Proxy] ⚠️ Failed to log restored streamed response: %v", err) + } +} + // addTransactionID adds transaction ID to JSON message for log correlation func (h *Handler) addTransactionID(message string, transactionID string) string { // Try to parse as JSON and add transaction_id field diff --git a/src/backend/proxy/streaming.go b/src/backend/proxy/streaming.go new file mode 100644 index 00000000..3a9516f4 --- /dev/null +++ b/src/backend/proxy/streaming.go @@ -0,0 +1,260 @@ +package proxy + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" +) + +// streamingClient is used for requests whose responses are streamed (SSE). +// Unlike the shared handler client it has no overall timeout, so long-lived +// token streams are not cut off after 30s. A response-header timeout still +// guards against a dead upstream. Proxy is nil to avoid looping back through +// ourselves. +var streamingClient = &http.Client{ + Transport: &http.Transport{ + Proxy: nil, + ResponseHeaderTimeout: 60 * time.Second, + }, +} + +// requestWantsStream reports whether the (masked) request body asks for a +// streaming response, i.e. it contains "stream": true. +func requestWantsStream(body []byte) bool { + var m map[string]interface{} + if err := json.Unmarshal(body, &m); err != nil { + return false + } + v, ok := m["stream"].(bool) + return ok && v +} + +// isEventStream reports whether the response is a Server-Sent Events stream. +func isEventStream(resp *http.Response) bool { + return strings.HasPrefix( + strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") +} + +// sseRestorer restores masked PII inside an SSE token stream. Placeholder +// (dummy) values can be split across consecutive content_block_delta events, so +// it keeps a per-content-block carry buffer and only emits text that is far +// enough from the tail that any placeholder starting in it is guaranteed +// complete. The held-back tail is flushed when the content block stops. +type sseRestorer struct { + mapping map[string]string + keep int // bytes to hold back = longest dummy length - 1 + carry map[int]string // un-emitted tail per content-block index + masked strings.Builder // raw model text (pre-restore), accumulated for logging +} + +func newSSERestorer(mapping map[string]string) *sseRestorer { + keep := 0 + for masked := range mapping { + if len(masked) > keep { + keep = len(masked) + } + } + if keep > 0 { + keep-- + } + return &sseRestorer{mapping: mapping, keep: keep, carry: map[int]string{}} +} + +// restore replaces every masked (dummy) value with its original. Replacing +// already-restored text is idempotent provided an original value does not +// contain a dummy placeholder as a substring. +func (s *sseRestorer) restore(text string) string { + for masked, original := range s.mapping { + text = strings.ReplaceAll(text, masked, original) + } + return text +} + +// splitSafe returns the prefix that is safe to emit now and the tail that must +// be held back so a placeholder straddling the boundary can still complete. +func splitSafe(s string, keep int) (emit, hold string) { + if len(s) <= keep { + return "", s + } + return s[:len(s)-keep], s[len(s)-keep:] +} + +// transformEvent rewrites a single, complete SSE event (the raw lines up to and +// including the blank terminator). For a content_block_delta text delta it +// restores PII and rewrites only the JSON on the data: line, leaving the +// surrounding event:/blank framing intact so the event is always well formed — +// even when the entire delta is held back (an empty text delta is emitted). Any +// held-back tail is flushed as a synthetic delta immediately before the +// matching content_block_stop. Every other event passes through byte-for-byte. +func (s *sseRestorer) transformEvent(lines [][]byte) []byte { + dataIdx := -1 + var evt struct { + Type string `json:"type"` + Index int `json:"index"` + Delta struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"delta"` + } + for i, ln := range lines { + t := bytes.TrimRight(ln, "\r\n") + if bytes.HasPrefix(t, []byte("data: ")) { + if json.Unmarshal(t[len("data: "):], &evt) == nil { + dataIdx = i + } + break + } + } + if dataIdx == -1 { + return concatLines(lines) // no recognisable data line: pass through + } + + switch { + case evt.Type == "content_block_delta" && evt.Delta.Type == "text_delta": + s.masked.WriteString(evt.Delta.Text) // raw model text, for audit logging + buf := s.carry[evt.Index] + evt.Delta.Text + emit, hold := splitSafe(s.restore(buf), s.keep) + s.carry[evt.Index] = hold + // Rewrite only the data: line; keep the original event:/blank lines. + out := make([][]byte, len(lines)) + copy(out, lines) + out[dataIdx] = textDeltaDataLine(evt.Index, emit) + return concatLines(out) + + case evt.Type == "content_block_stop": + var out []byte + if tail := s.carry[evt.Index]; tail != "" { + out = append(out, textDeltaEvent(evt.Index, tail)...) + delete(s.carry, evt.Index) + } + return append(out, concatLines(lines)...) + + default: + return concatLines(lines) + } +} + +func concatLines(lines [][]byte) []byte { + var b []byte + for _, ln := range lines { + b = append(b, ln...) + } + return b +} + +// textDeltaDataLine builds just the `data: {...}\n` line for a text delta. +func textDeltaDataLine(index int, text string) []byte { + payload, _ := json.Marshal(map[string]interface{}{ + "type": "content_block_delta", + "index": index, + "delta": map[string]interface{}{"type": "text_delta", "text": text}, + }) + out := append([]byte("data: "), payload...) + return append(out, '\n') +} + +// textDeltaEvent builds a complete content_block_delta SSE event, including its +// trailing blank line, used to flush a held-back tail. +func textDeltaEvent(index int, text string) []byte { + out := []byte("event: content_block_delta\n") + out = append(out, textDeltaDataLine(index, text)...) + return append(out, '\n') +} + +// isBlankLine reports whether a raw line is an SSE event terminator. +func isBlankLine(line []byte) bool { + return len(bytes.TrimRight(line, "\r\n")) == 0 +} + +// streamSSEResponse writes an SSE response to the (HTTP/1.1) client connection, +// restoring masked PII incrementally and flushing after every event so the +// client receives tokens as they arrive. Events are buffered only until their +// blank-line terminator, never the whole body, and chunked transfer encoding is +// used instead of Content-Length. +func streamSSEResponse(conn net.Conn, resp *http.Response, restorer *sseRestorer) error { + bw := bufio.NewWriter(conn) + + // Status line. + if _, err := fmt.Fprintf(bw, "HTTP/1.1 %d %s\r\n", resp.StatusCode, http.StatusText(resp.StatusCode)); err != nil { + return err + } + + // Copy headers, but take control of framing: stream as chunked, no length. + for key, values := range resp.Header { + switch strings.ToLower(key) { + case "content-length", "transfer-encoding", "connection": + continue + } + for _, v := range values { + if _, err := fmt.Fprintf(bw, "%s: %s\r\n", key, v); err != nil { + return err + } + } + } + if _, err := bw.WriteString("Transfer-Encoding: chunked\r\nConnection: keep-alive\r\n\r\n"); err != nil { + return err + } + if err := bw.Flush(); err != nil { + return err + } + + writeChunk := func(b []byte) error { + if len(b) == 0 { + return nil + } + if _, err := fmt.Fprintf(bw, "%x\r\n", len(b)); err != nil { + return err + } + if _, err := bw.Write(b); err != nil { + return err + } + if _, err := bw.WriteString("\r\n"); err != nil { + return err + } + return bw.Flush() // flush each event so the client streams in real time + } + + reader := bufio.NewReader(resp.Body) + var event [][]byte + flush := func() error { + if len(event) == 0 { + return nil + } + out := restorer.transformEvent(event) + event = event[:0] + return writeChunk(out) + } + + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + event = append(event, line) + if isBlankLine(line) { // blank line terminates the SSE event + if werr := flush(); werr != nil { + return werr + } + } + } + if err != nil { + if err == io.EOF { + if ferr := flush(); ferr != nil { // trailing event w/o blank line + return ferr + } + break + } + return err + } + } + + // Terminating zero-length chunk. + if _, err := bw.WriteString("0\r\n\r\n"); err != nil { + return err + } + return bw.Flush() +} diff --git a/src/backend/proxy/transparent.go b/src/backend/proxy/transparent.go index a80cf84e..d64dfb80 100644 --- a/src/backend/proxy/transparent.go +++ b/src/backend/proxy/transparent.go @@ -445,9 +445,16 @@ func (tp *TransparentProxy) interceptHTTPOverTLS(conn net.Conn, r *http.Request, // Explicitly set Accept-Encoding to identity to avoid compressed responses proxyReq.Header.Set("Accept-Encoding", "identity") - // Forward request using handler's HTTP client (bypasses proxy to prevent infinite loop) - log.Printf("[TransparentProxy] Forwarding TLS request directly to %s (bypassing proxy)", targetURL) - resp, err := tp.handler.GetHTTPClient().Do(proxyReq) + // Forward request using handler's HTTP client (bypasses proxy to prevent infinite loop). + // Streaming requests use a client without an overall timeout so long-lived + // SSE token streams are not cut off mid-response. + wantStream := requestWantsStream(processed.RedactedBody) + httpClient := tp.handler.GetHTTPClient() + if wantStream { + httpClient = streamingClient + } + log.Printf("[TransparentProxy] Forwarding TLS request directly to %s (bypassing proxy, stream=%t)", targetURL, wantStream) + resp, err := httpClient.Do(proxyReq) if err != nil { log.Printf("[TransparentProxy] ❌ Failed to forward request: %v", err) tp.writeErrorResponse(conn, http.StatusBadGateway, fmt.Sprintf("Failed to forward request: %v", err)) @@ -455,6 +462,24 @@ func (tp *TransparentProxy) interceptHTTPOverTLS(conn net.Conn, r *http.Request, } defer resp.Body.Close() + // Stream Server-Sent Events straight through to the client, restoring PII + // incrementally. Buffering an SSE stream (as the non-streaming path below + // does) breaks streaming clients like Claude Code and hangs until the + // upstream timeout fires. + if wantStream && isEventStream(resp) { + log.Printf("[TransparentProxy] Streaming SSE response for %s", r.URL.Path) + restorer := newSSERestorer(processed.MaskedToOriginal) + if streamErr := streamSSEResponse(conn, resp, restorer); streamErr != nil { + log.Printf("[TransparentProxy] ❌ Failed to stream SSE response: %v", streamErr) + } + // Record the streamed response for audit: masked = the text the model + // actually returned (pre-restore), restored = what the client received. + maskedText := restorer.masked.String() + tp.handler.LogStreamedResponse(ctx, processed.TransactionID, maskedText, restorer.restore(maskedText)) + log.Printf("[TransparentProxy] Streamed %s %s - Status: %d", r.Method, r.URL.Path, resp.StatusCode) + return + } + // Read response body respBody, err := io.ReadAll(resp.Body) if err != nil { From 93bc65e01ee086518700b813f2f0ee022e0accc3 Mon Sep 17 00:00:00 2001 From: Hannes Hapke Date: Sat, 13 Jun 2026 10:13:31 -0700 Subject: [PATCH 2/4] codex streaming --- src/backend/config/config.go | 8 +- src/backend/providers/openai.go | 31 ++++- src/backend/providers/provider.go | 3 +- src/backend/proxy/codec_anthropic.go | 122 +++++++++++++++++++ src/backend/proxy/codec_openai.go | 133 +++++++++++++++++++++ src/backend/proxy/handler.go | 5 +- src/backend/proxy/streaming.go | 170 +++++++++++++-------------- src/backend/proxy/transparent.go | 8 +- 8 files changed, 377 insertions(+), 103 deletions(-) create mode 100644 src/backend/proxy/codec_anthropic.go create mode 100644 src/backend/proxy/codec_openai.go diff --git a/src/backend/config/config.go b/src/backend/config/config.go index 90f20677..a29f76a6 100644 --- a/src/backend/config/config.go +++ b/src/backend/config/config.go @@ -317,13 +317,19 @@ func DefaultConfig() *Config { // GetInterceptDomains returns the list of intercept domains (as a union of all provider domains) func (pc ProvidersConfig) GetInterceptDomains() []string { - return []string{ + domains := []string{ interceptDomain(pc.AnthropicProviderConfig.APIDomain), interceptDomain(pc.OpenAIProviderConfig.APIDomain), interceptDomain(pc.GeminiProviderConfig.APIDomain), interceptDomain(pc.MistralProviderConfig.APIDomain), interceptDomain(pc.CustomProviderConfig.APIDomain), } + // ChatGPT-login Codex talks to chatgpt.com instead of the configured OpenAI + // API domain, so it must be intercepted explicitly whenever OpenAI is enabled. + if pc.OpenAIProviderConfig.APIDomain != "" { + domains = append(domains, "chatgpt.com") + } + return domains } func interceptDomain(apiDomain string) string { diff --git a/src/backend/providers/openai.go b/src/backend/providers/openai.go index 0042cf3c..10418a3e 100644 --- a/src/backend/providers/openai.go +++ b/src/backend/providers/openai.go @@ -11,11 +11,16 @@ import ( ) const ( - ProviderTypeOpenAI ProviderType = "openai" - ProviderSubpathOpenAI string = "/v1/chat/completions" - ProviderSubpathOpenAIResp string = "/v1/responses" - ProviderAPIDomainOpenAI string = "api.openai.com" - ProviderNameOpenAI string = "OpenAI" + ProviderTypeOpenAI ProviderType = "openai" + ProviderSubpathOpenAI string = "/v1/chat/completions" + ProviderSubpathOpenAIResp string = "/v1/responses" + ProviderAPIDomainOpenAI string = "api.openai.com" + ProviderNameOpenAI string = "OpenAI" + // ProviderAPIDomainCodex is the host used by ChatGPT-login Codex (the OpenAI + // CLI). It hits chatgpt.com/backend-api/codex/responses with an OAuth bearer + // token instead of api.openai.com, so it must be routed to and intercepted by + // the OpenAI provider alongside the API-key host. + ProviderAPIDomainCodex string = "chatgpt.com" ) // reasoningModelFamilies lists OpenAI model family prefixes that require the @@ -548,6 +553,22 @@ func restoreResponsesAPIResponse(maskedResponse map[string]interface{}, maskedTo if !ok { continue } + + // function_call output items carry model-generated tool arguments as a + // JSON string, which can echo masked PII just like assistant text. Restore + // it here so buffered responses match the streaming codec's behavior. + if args, ok := itemMap["arguments"].(string); ok { + restoredArgs := restorePII(args, maskedToOriginal) + if restoredArgs != args && getLogResponses() { + log.Printf("PII restored in response tool-call arguments") + if getLogVerbose() { + log.Printf("Original tool-call arguments: %s", args) + log.Printf("Restored tool-call arguments: %s", restoredArgs) + } + } + itemMap["arguments"] = restoredArgs + } + contents, ok := itemMap["content"].([]interface{}) if !ok { continue diff --git a/src/backend/providers/provider.go b/src/backend/providers/provider.go index fb5890d2..8304afa7 100644 --- a/src/backend/providers/provider.go +++ b/src/backend/providers/provider.go @@ -175,7 +175,8 @@ func (p *Providers) GetProviderFromHost(host string, logPrefix string) (*Provide } switch { - case p.OpenAIProvider != nil && providerHostMatches(host, p.OpenAIProvider.apiDomain): + case p.OpenAIProvider != nil && (providerHostMatches(host, p.OpenAIProvider.apiDomain) || host == ProviderAPIDomainCodex): + // chatgpt.com is the ChatGPT-login Codex host; route it to OpenAI too. provider = p.OpenAIProvider case p.AnthropicProvider != nil && providerHostMatches(host, p.AnthropicProvider.apiDomain): provider = p.AnthropicProvider diff --git a/src/backend/proxy/codec_anthropic.go b/src/backend/proxy/codec_anthropic.go new file mode 100644 index 00000000..8336fa75 --- /dev/null +++ b/src/backend/proxy/codec_anthropic.go @@ -0,0 +1,122 @@ +package proxy + +import ( + "bytes" + "encoding/json" +) + +// anthropicCodec restores masked PII inside an Anthropic Messages-API SSE +// stream. Placeholder (dummy) values can be split across consecutive +// content_block_delta events, so it keeps a per-content-block carry buffer and +// only emits text that is far enough from the tail that any placeholder starting +// in it is guaranteed complete. The held-back tail is flushed when the content +// block stops. +type anthropicCodec struct { + restoreCore + carry map[int]string // un-emitted tail per content-block index + kind map[int]string // delta type per index ("text_delta" / "input_json_delta") +} + +func newAnthropicCodec(mapping map[string]string) *anthropicCodec { + return &anthropicCodec{ + restoreCore: newRestoreCore(mapping), + carry: map[int]string{}, + kind: map[int]string{}, + } +} + +// transformEvent rewrites a single, complete SSE event (the raw lines up to and +// including the blank terminator). For a content_block_delta text delta it +// restores PII and rewrites only the JSON on the data: line, leaving the +// surrounding event:/blank framing intact so the event is always well formed — +// even when the entire delta is held back (an empty text delta is emitted). Any +// held-back tail is flushed as a synthetic delta immediately before the +// matching content_block_stop. Every other event passes through byte-for-byte. +func (s *anthropicCodec) transformEvent(lines [][]byte) []byte { + dataIdx := -1 + var evt struct { + Type string `json:"type"` + Index int `json:"index"` + Delta struct { + Type string `json:"type"` + Text string `json:"text"` + PartialJSON string `json:"partial_json"` + } `json:"delta"` + } + for i, ln := range lines { + t := bytes.TrimRight(ln, "\r\n") + if bytes.HasPrefix(t, []byte("data: ")) { + if json.Unmarshal(t[len("data: "):], &evt) == nil { + dataIdx = i + } + break + } + } + if dataIdx == -1 { + return concatLines(lines) // no recognisable data line: pass through + } + + switch { + case evt.Type == "content_block_delta" && (evt.Delta.Type == "text_delta" || evt.Delta.Type == "input_json_delta"): + // text_delta carries assistant text; input_json_delta carries a fragment + // of a tool call's JSON arguments. PII can hide in either, and can be + // split across consecutive deltas, so both go through the carry buffer. + raw := evt.Delta.Text + if evt.Delta.Type == "input_json_delta" { + raw = evt.Delta.PartialJSON + } + s.masked.WriteString(raw) // raw model output (text or tool args), for audit + s.kind[evt.Index] = evt.Delta.Type + buf := s.carry[evt.Index] + raw + emit, hold := splitSafe(s.restore(buf), s.keep) + s.carry[evt.Index] = hold + // Rewrite only the data: line; keep the original event:/blank lines. + out := make([][]byte, len(lines)) + copy(out, lines) + out[dataIdx] = deltaDataLine(evt.Index, evt.Delta.Type, emit) + return concatLines(out) + + case evt.Type == "content_block_stop": + var out []byte + if tail := s.carry[evt.Index]; tail != "" { + out = append(out, deltaEvent(evt.Index, s.kind[evt.Index], tail)...) + delete(s.carry, evt.Index) + delete(s.kind, evt.Index) + } + return append(out, concatLines(lines)...) + + default: + return concatLines(lines) + } +} + +// deltaField returns the JSON field that carries the payload for a delta type. +func deltaField(deltaType string) string { + if deltaType == "input_json_delta" { + return "partial_json" + } + return "text" +} + +// deltaDataLine builds the `data: {...}\n` line for a text_delta or +// input_json_delta carrying the given (restored) payload. +func deltaDataLine(index int, deltaType, payload string) []byte { + b, _ := json.Marshal(map[string]interface{}{ + "type": "content_block_delta", + "index": index, + "delta": map[string]interface{}{"type": deltaType, deltaField(deltaType): payload}, + }) + out := append([]byte("data: "), b...) + return append(out, '\n') +} + +// deltaEvent builds a complete content_block_delta SSE event (with trailing +// blank line) used to flush a held-back tail. Defaults to a text delta. +func deltaEvent(index int, deltaType, payload string) []byte { + if deltaType == "" { + deltaType = "text_delta" + } + out := []byte("event: content_block_delta\n") + out = append(out, deltaDataLine(index, deltaType, payload)...) + return append(out, '\n') +} diff --git a/src/backend/proxy/codec_openai.go b/src/backend/proxy/codec_openai.go new file mode 100644 index 00000000..a85d0275 --- /dev/null +++ b/src/backend/proxy/codec_openai.go @@ -0,0 +1,133 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" +) + +// openaiCodec restores masked PII inside an OpenAI Responses-API SSE stream +// (used by both api.openai.com and ChatGPT-login Codex on chatgpt.com). +// +// Incremental text arrives in `*.delta` events whose payload is the string field +// "delta" (e.g. response.output_text.delta, response.function_call_arguments. +// delta). A matching `*.done` event then repeats the complete value in a field +// named for its kind ("text", "arguments", or "refusal"). Placeholders can be +// split across consecutive deltas, so delta text goes through a per-channel +// carry buffer keyed by output_index/content_index; the held-back tail is +// flushed as a synthetic delta when the channel's `.done` arrives, and the +// `.done` payload itself is restored whole. +// +// The codec matches on the stable `.delta`/`.done` type suffix and the payload +// field names rather than enumerating exact event types, so it tolerates new +// Responses event kinds (text, tool args, refusals) without changes. Events it +// does not recognise pass through byte-for-byte. +type openaiCodec struct { + restoreCore + carry map[string]string // un-emitted (already-restored) tail per channel +} + +func newOpenAICodec(mapping map[string]string) *openaiCodec { + return &openaiCodec{ + restoreCore: newRestoreCore(mapping), + carry: map[string]string{}, + } +} + +// doneFields are the fields a Responses `*.done` event uses to carry the full +// value, by event kind: output text, tool-call arguments, and refusals. +var doneFields = []string{"text", "arguments", "refusal"} + +func (c *openaiCodec) transformEvent(lines [][]byte) []byte { + dataIdx := -1 + var raw []byte + for i, ln := range lines { + t := bytes.TrimRight(ln, "\r\n") + if bytes.HasPrefix(t, []byte("data: ")) { + raw = t[len("data: "):] + dataIdx = i + break + } + } + if dataIdx == -1 { + return concatLines(lines) // no data line (comment/keep-alive): pass through + } + + var obj map[string]interface{} + if err := json.Unmarshal(raw, &obj); err != nil { + return concatLines(lines) // non-JSON payload (e.g. "[DONE]"): pass through + } + typ, _ := obj["type"].(string) + key := channelKey(obj) + + switch { + case strings.HasSuffix(typ, ".delta"): + delta, ok := obj["delta"].(string) + if !ok { + return concatLines(lines) // non-text delta (e.g. audio): pass through + } + c.masked.WriteString(delta) // raw model output, for audit + emit, hold := splitSafe(c.restore(c.carry[key]+delta), c.keep) + c.carry[key] = hold + obj["delta"] = emit + return rewriteDataLine(lines, dataIdx, obj) + + case strings.HasSuffix(typ, ".done"): + var out []byte + // Flush any held-back delta tail as a synthetic delta so incremental + // renderers see the complete text before the terminating .done event. + if tail := c.carry[key]; tail != "" { + delete(c.carry, key) + out = append(out, c.tailDeltaEvent(typ, obj, tail)...) + } + // Restore the full value the .done event repeats. (Not added to the audit + // accumulator: the deltas above already captured this text.) + for _, field := range doneFields { + if full, ok := obj[field].(string); ok { + obj[field] = c.restore(full) + } + } + return append(out, rewriteDataLine(lines, dataIdx, obj)...) + + default: + return concatLines(lines) + } +} + +// tailDeltaEvent builds a synthetic `*.delta` event mirroring a `*.done` event's +// channel, carrying the already-restored held-back tail. The payload is emitted +// as-is (the carry buffer stores post-restore text). +func (c *openaiCodec) tailDeltaEvent(doneType string, obj map[string]interface{}, tail string) []byte { + deltaType := strings.TrimSuffix(doneType, ".done") + ".delta" + d := map[string]interface{}{"type": deltaType, "delta": tail} + for _, f := range []string{"item_id", "output_index", "content_index"} { + if v, ok := obj[f]; ok { + d[f] = v + } + } + b, _ := json.Marshal(d) + out := []byte("event: " + deltaType + "\n") + out = append(out, append(append([]byte("data: "), b...), '\n')...) + return append(out, '\n') +} + +// channelKey identifies an independent text channel within a Responses stream so +// each gets its own carry buffer. Deltas and their .done share output_index and +// content_index. +func channelKey(obj map[string]interface{}) string { + oi, _ := obj["output_index"].(float64) + ci, _ := obj["content_index"].(float64) + return fmt.Sprintf("%d:%d", int(oi), int(ci)) +} + +// rewriteDataLine re-marshals obj into the event's data: line, preserving every +// other line (event:, blank terminator) so framing stays intact. +func rewriteDataLine(lines [][]byte, dataIdx int, obj map[string]interface{}) []byte { + b, _ := json.Marshal(obj) + out := make([][]byte, len(lines)) + copy(out, lines) + line := append([]byte("data: "), b...) + out[dataIdx] = append(line, '\n') + return concatLines(out) +} diff --git a/src/backend/proxy/handler.go b/src/backend/proxy/handler.go index dc57ae43..51fa035f 100644 --- a/src/backend/proxy/handler.go +++ b/src/backend/proxy/handler.go @@ -490,8 +490,9 @@ func (h *Handler) ProcessResponseBody(ctx context.Context, body []byte, contentT // straight to the client), so this mirrors its response-logging half: a // response_masked row (the text the model actually returned, before PII // restoration) and a response_original row (the text delivered to the client, -// after restoration), correlated by transactionID. Only assistant text is -// captured; tool_use / tool_result blocks are not included. +// after restoration), correlated by transactionID. Both assistant text and +// tool-call argument JSON are captured (concatenated in the logged text); +// tool_result blocks on the request side are not. func (h *Handler) LogStreamedResponse(ctx context.Context, transactionID, maskedText, restoredText string) { if h.loggingDB == nil { return diff --git a/src/backend/proxy/streaming.go b/src/backend/proxy/streaming.go index 3a9516f4..88d3c6d8 100644 --- a/src/backend/proxy/streaming.go +++ b/src/backend/proxy/streaming.go @@ -6,12 +6,42 @@ import ( "encoding/json" "fmt" "io" + "log" "net" "net/http" + "os" + "path/filepath" "strings" "time" + + "github.com/dataiku/kiji-proxy/src/backend/providers" ) +// sseCaptureDirEnv names a directory to mirror raw upstream SSE streams into, +// one file per stream, for debugging provider event grammars (e.g. confirming +// the real ChatGPT-login Codex Responses event names before trusting the codec). +// Capture is off unless this env var is set to a writable directory. +const sseCaptureDirEnv = "KIJI_SSE_CAPTURE_DIR" + +// newSSECapture opens a per-stream capture file when sseCaptureDirEnv is set, +// or returns nil to disable capture. The returned writer receives the upstream +// bytes byte-for-byte (pre-restore), so the captured file is the provider's raw +// SSE as sent. +func newSSECapture() io.WriteCloser { + dir := os.Getenv(sseCaptureDirEnv) + if dir == "" { + return nil + } + name := fmt.Sprintf("sse-%d.log", time.Now().UnixNano()) + f, err := os.Create(filepath.Join(dir, name)) + if err != nil { + log.Printf("[stream] SSE capture disabled, cannot create file in %s: %v", dir, err) + return nil + } + log.Printf("[stream] Capturing raw upstream SSE to %s", f.Name()) + return f +} + // streamingClient is used for requests whose responses are streamed (SSE). // Unlike the shared handler client it has no overall timeout, so long-lived // token streams are not cut off after 30s. A response-header timeout still @@ -41,19 +71,32 @@ func isEventStream(resp *http.Response) bool { strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") } -// sseRestorer restores masked PII inside an SSE token stream. Placeholder -// (dummy) values can be split across consecutive content_block_delta events, so -// it keeps a per-content-block carry buffer and only emits text that is far -// enough from the tail that any placeholder starting in it is guaranteed -// complete. The held-back tail is flushed when the content block stops. -type sseRestorer struct { +// streamCodec restores masked PII inside one provider's SSE token stream. The +// engine below (streamSSEResponse) owns the transport-level framing and event +// splitting; a codec only knows how to rewrite a single, complete SSE event for +// its provider's grammar. transformEvent receives the raw lines of one event +// (up to and including the blank terminator) and returns the bytes to write. +type streamCodec interface { + transformEvent(lines [][]byte) []byte + // restore replaces every masked (dummy) value with its original. Exposed so + // the caller can restore the accumulated audit text after the stream ends. + restore(text string) string + // maskedOutput returns the raw model output (pre-restore) accumulated across + // the stream, for audit logging. + maskedOutput() string +} + +// restoreCore holds the provider-agnostic restore state shared by every codec: +// the dummy→original mapping, the hold-back length, and the pre-restore model +// output accumulated for audit logging. Codecs embed it so they only implement +// their own event grammar. +type restoreCore struct { mapping map[string]string keep int // bytes to hold back = longest dummy length - 1 - carry map[int]string // un-emitted tail per content-block index - masked strings.Builder // raw model text (pre-restore), accumulated for logging + masked strings.Builder // raw model output (pre-restore), accumulated for logging } -func newSSERestorer(mapping map[string]string) *sseRestorer { +func newRestoreCore(mapping map[string]string) restoreCore { keep := 0 for masked := range mapping { if len(masked) > keep { @@ -63,19 +106,23 @@ func newSSERestorer(mapping map[string]string) *sseRestorer { if keep > 0 { keep-- } - return &sseRestorer{mapping: mapping, keep: keep, carry: map[int]string{}} + return restoreCore{mapping: mapping, keep: keep} } // restore replaces every masked (dummy) value with its original. Replacing // already-restored text is idempotent provided an original value does not // contain a dummy placeholder as a substring. -func (s *sseRestorer) restore(text string) string { - for masked, original := range s.mapping { +func (c *restoreCore) restore(text string) string { + for masked, original := range c.mapping { text = strings.ReplaceAll(text, masked, original) } return text } +func (c *restoreCore) maskedOutput() string { + return c.masked.String() +} + // splitSafe returns the prefix that is safe to emit now and the tail that must // be held back so a placeholder straddling the boundary can still complete. func splitSafe(s string, keep int) (emit, hold string) { @@ -85,61 +132,6 @@ func splitSafe(s string, keep int) (emit, hold string) { return s[:len(s)-keep], s[len(s)-keep:] } -// transformEvent rewrites a single, complete SSE event (the raw lines up to and -// including the blank terminator). For a content_block_delta text delta it -// restores PII and rewrites only the JSON on the data: line, leaving the -// surrounding event:/blank framing intact so the event is always well formed — -// even when the entire delta is held back (an empty text delta is emitted). Any -// held-back tail is flushed as a synthetic delta immediately before the -// matching content_block_stop. Every other event passes through byte-for-byte. -func (s *sseRestorer) transformEvent(lines [][]byte) []byte { - dataIdx := -1 - var evt struct { - Type string `json:"type"` - Index int `json:"index"` - Delta struct { - Type string `json:"type"` - Text string `json:"text"` - } `json:"delta"` - } - for i, ln := range lines { - t := bytes.TrimRight(ln, "\r\n") - if bytes.HasPrefix(t, []byte("data: ")) { - if json.Unmarshal(t[len("data: "):], &evt) == nil { - dataIdx = i - } - break - } - } - if dataIdx == -1 { - return concatLines(lines) // no recognisable data line: pass through - } - - switch { - case evt.Type == "content_block_delta" && evt.Delta.Type == "text_delta": - s.masked.WriteString(evt.Delta.Text) // raw model text, for audit logging - buf := s.carry[evt.Index] + evt.Delta.Text - emit, hold := splitSafe(s.restore(buf), s.keep) - s.carry[evt.Index] = hold - // Rewrite only the data: line; keep the original event:/blank lines. - out := make([][]byte, len(lines)) - copy(out, lines) - out[dataIdx] = textDeltaDataLine(evt.Index, emit) - return concatLines(out) - - case evt.Type == "content_block_stop": - var out []byte - if tail := s.carry[evt.Index]; tail != "" { - out = append(out, textDeltaEvent(evt.Index, tail)...) - delete(s.carry, evt.Index) - } - return append(out, concatLines(lines)...) - - default: - return concatLines(lines) - } -} - func concatLines(lines [][]byte) []byte { var b []byte for _, ln := range lines { @@ -148,25 +140,6 @@ func concatLines(lines [][]byte) []byte { return b } -// textDeltaDataLine builds just the `data: {...}\n` line for a text delta. -func textDeltaDataLine(index int, text string) []byte { - payload, _ := json.Marshal(map[string]interface{}{ - "type": "content_block_delta", - "index": index, - "delta": map[string]interface{}{"type": "text_delta", "text": text}, - }) - out := append([]byte("data: "), payload...) - return append(out, '\n') -} - -// textDeltaEvent builds a complete content_block_delta SSE event, including its -// trailing blank line, used to flush a held-back tail. -func textDeltaEvent(index int, text string) []byte { - out := []byte("event: content_block_delta\n") - out = append(out, textDeltaDataLine(index, text)...) - return append(out, '\n') -} - // isBlankLine reports whether a raw line is an SSE event terminator. func isBlankLine(line []byte) bool { return len(bytes.TrimRight(line, "\r\n")) == 0 @@ -176,8 +149,9 @@ func isBlankLine(line []byte) bool { // restoring masked PII incrementally and flushing after every event so the // client receives tokens as they arrive. Events are buffered only until their // blank-line terminator, never the whole body, and chunked transfer encoding is -// used instead of Content-Length. -func streamSSEResponse(conn net.Conn, resp *http.Response, restorer *sseRestorer) error { +// used instead of Content-Length. The provider-specific rewriting is delegated +// to codec. +func streamSSEResponse(conn net.Conn, resp *http.Response, codec streamCodec) error { bw := bufio.NewWriter(conn) // Status line. @@ -220,13 +194,18 @@ func streamSSEResponse(conn net.Conn, resp *http.Response, restorer *sseRestorer return bw.Flush() // flush each event so the client streams in real time } - reader := bufio.NewReader(resp.Body) + var src io.Reader = resp.Body + if cap := newSSECapture(); cap != nil { + defer cap.Close() + src = io.TeeReader(resp.Body, cap) // mirror raw upstream bytes for debugging + } + reader := bufio.NewReader(src) var event [][]byte flush := func() error { if len(event) == 0 { return nil } - out := restorer.transformEvent(event) + out := codec.transformEvent(event) event = event[:0] return writeChunk(out) } @@ -258,3 +237,14 @@ func streamSSEResponse(conn net.Conn, resp *http.Response, restorer *sseRestorer } return bw.Flush() } + +// codecForProvider selects the SSE codec matching the upstream provider's stream +// grammar. OpenAI (incl. ChatGPT-login Codex on chatgpt.com) speaks the +// Responses-API event shape; everything else uses the Anthropic grammar, which +// is also the safe default. +func codecForProvider(provider *providers.Provider, mapping map[string]string) streamCodec { + if provider != nil && (*provider).GetType() == providers.ProviderTypeOpenAI { + return newOpenAICodec(mapping) + } + return newAnthropicCodec(mapping) +} diff --git a/src/backend/proxy/transparent.go b/src/backend/proxy/transparent.go index d64dfb80..c1fcb410 100644 --- a/src/backend/proxy/transparent.go +++ b/src/backend/proxy/transparent.go @@ -468,14 +468,14 @@ func (tp *TransparentProxy) interceptHTTPOverTLS(conn net.Conn, r *http.Request, // upstream timeout fires. if wantStream && isEventStream(resp) { log.Printf("[TransparentProxy] Streaming SSE response for %s", r.URL.Path) - restorer := newSSERestorer(processed.MaskedToOriginal) - if streamErr := streamSSEResponse(conn, resp, restorer); streamErr != nil { + codec := codecForProvider(provider, processed.MaskedToOriginal) + if streamErr := streamSSEResponse(conn, resp, codec); streamErr != nil { log.Printf("[TransparentProxy] ❌ Failed to stream SSE response: %v", streamErr) } // Record the streamed response for audit: masked = the text the model // actually returned (pre-restore), restored = what the client received. - maskedText := restorer.masked.String() - tp.handler.LogStreamedResponse(ctx, processed.TransactionID, maskedText, restorer.restore(maskedText)) + maskedText := codec.maskedOutput() + tp.handler.LogStreamedResponse(ctx, processed.TransactionID, maskedText, codec.restore(maskedText)) log.Printf("[TransparentProxy] Streamed %s %s - Status: %d", r.Method, r.URL.Path, resp.StatusCode) return } From 8d40826e4151fa3aaae05417a9690bdae23bbc08 Mon Sep 17 00:00:00 2001 From: Hannes Hapke Date: Sat, 13 Jun 2026 10:33:26 -0700 Subject: [PATCH 3/4] documentation --- README.md | 2 + docs/09-coding-agents.md | 173 +++++++++++++++++++++++++++++++++++++++ docs/README.md | 22 +++++ 3 files changed, 197 insertions(+) create mode 100644 docs/09-coding-agents.md diff --git a/README.md b/README.md index 73513e4b..287e5e2b 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,7 @@ Complete documentation is available in [docs/README.md](docs/README.md): - **[Chrome Extension](docs/06-chrome-extension.md)** - Building, configuring, and publishing the PII Guard extension - **[Customizing the PII Model](docs/07-customizing-pii-model.md)** - Training a model with your own entity types - **[Masking Controls & Review](docs/08-masking-controls.md)** - Disable entity types, custom regex, mapping review +- **[Coding Agents (Codex & Claude Code)](docs/09-coding-agents.md)** - Route terminal coding agents through the proxy **Quick Links:** - [Installation Guide](docs/01-getting-started.md#quick-installation) @@ -225,6 +226,7 @@ Complete documentation is available in [docs/README.md](docs/README.md): - [Build for macOS](docs/03-building-deployment.md#building-for-macos) - [Build for Linux](docs/03-building-deployment.md#building-for-linux) - [Masking Controls](docs/08-masking-controls.md) - disable entities, custom regex, review mappings +- [Coding Agents Setup](docs/09-coding-agents.md) - Codex & Claude Code via the proxy --- diff --git a/docs/09-coding-agents.md b/docs/09-coding-agents.md new file mode 100644 index 00000000..5ce2e9ab --- /dev/null +++ b/docs/09-coding-agents.md @@ -0,0 +1,173 @@ +# Chapter 9: Coding Agents (Codex & Claude Code) + +Route terminal coding agents — OpenAI **Codex** and Anthropic **Claude Code** — through Kiji Privacy Proxy so that PII in your prompts, files, and tool calls is masked before it reaches the model, and restored in the model's replies. Streaming responses are restored token-by-token, so the agent still feels live. + +This chapter focuses on **what to set up on the client side**. For how the proxy itself works, see [Advanced Topics](05-advanced-topics.md#transparent-proxy--mitm). + +## How it works + +Coding agents talk to their provider over HTTPS. The proxy runs in **transparent (MITM) mode** (port `8081`): it intercepts traffic to known provider hosts, masks PII in the outgoing request, forwards it, then restores PII in the response — buffered or streamed (SSE). + +Hosts the proxy intercepts for coding agents: + +| Agent | Host(s) | Notes | +|-------|---------|-------| +| Claude Code | `api.anthropic.com` | | +| Codex (API key) | `api.openai.com` | `/v1/responses` and `/v1/chat/completions` | +| Codex (ChatGPT login) | `chatgpt.com` | `/backend-api/codex/responses` | + +For **any** agent, two things must be true: + +1. **The agent sends its HTTPS traffic through the proxy** — via `HTTP_PROXY` / `HTTPS_PROXY`. The macOS PAC auto-configuration only routes **browsers**; command-line agents must be pointed at the proxy explicitly. +2. **The agent trusts the proxy's CA** — so the MITM TLS handshake is accepted. Each agent reads its trusted CA from a different place (see below). + +## Prerequisites + +- Kiji Privacy Proxy is **running** (desktop app on macOS, or the standalone backend on Linux). See [Getting Started](01-getting-started.md). +- You know the path to the proxy CA certificate: + + | Platform | CA certificate path | + |----------|---------------------| + | macOS | `$HOME/Library/Application Support/Kiji Privacy Proxy/certs/ca.crt` | + | Linux | `~/.kiji-proxy/certs/ca.crt` | + +Throughout this chapter the proxy endpoint is `http://127.0.0.1:8081` (the transparent proxy port). Adjust if you changed `proxy_port`. + +## Claude Code + +Claude Code is a Node.js application, so it uses the standard Node proxy and CA variables. + +### Environment variables + +```bash +export HTTP_PROXY=http://127.0.0.1:8081 +export HTTPS_PROXY=http://127.0.0.1:8081 + +# macOS +export NODE_EXTRA_CA_CERTS="$HOME/Library/Application Support/Kiji Privacy Proxy/certs/ca.crt" +# Linux +export NODE_EXTRA_CA_CERTS="$HOME/.kiji-proxy/certs/ca.crt" +``` + +Then run `claude` in the same shell. Requests to `api.anthropic.com` now flow through the proxy. + +### Making it persistent + +Instead of exporting in every shell, set the variables in Claude Code's settings file so they apply to every session. Add an `env` block to `~/.claude/settings.json`: + +```json +{ + "env": { + "HTTP_PROXY": "http://127.0.0.1:8081", + "HTTPS_PROXY": "http://127.0.0.1:8081", + "NODE_EXTRA_CA_CERTS": "/Users/you/Library/Application Support/Kiji Privacy Proxy/certs/ca.crt" + } +} +``` + +(The path may contain spaces — that's fine inside the JSON string. Use the absolute path; `~`/`$HOME` are not expanded here.) + +## Codex + +Codex (`codex-cli`) is a **native Rust binary that uses rustls** for TLS, not Node and not the macOS keychain. Two consequences: + +- `NODE_EXTRA_CA_CERTS` is a Node concept — but Codex's CA loader happens to honor it as a fallback, so it still works (see below). +- Adding the CA to the macOS **System keychain alone is not enough**, because rustls uses its own root store. You must point Codex at the CA **file** via an environment variable. + +### Environment variables + +```bash +export HTTP_PROXY=http://127.0.0.1:8081 +export HTTPS_PROXY=http://127.0.0.1:8081 + +# macOS +export CODEX_CA_CERTIFICATE="$HOME/Library/Application Support/Kiji Privacy Proxy/certs/ca.crt" +# Linux +export CODEX_CA_CERTIFICATE="$HOME/.kiji-proxy/certs/ca.crt" +``` + +Then run `codex`. `CODEX_CA_CERTIFICATE` is Codex's native variable. If it is unset, Codex falls back — in order — to these standard CA-bundle variables, so any of them works too: + +``` +CODEX_CA_CERTIFICATE → SSL_CERT_FILE → REQUESTS_CA_BUNDLE → CURL_CA_BUNDLE + → NODE_EXTRA_CA_CERTS → GIT_SSL_CAINFO → BUNDLE_SSL_CA_CERT +``` + +This means if you already export `NODE_EXTRA_CA_CERTS` globally for Claude Code, Codex will pick up the same CA automatically — but setting `CODEX_CA_CERTIFICATE` explicitly is clearest. + +### API-key vs ChatGPT-login Codex + +- **API-key Codex** (`OPENAI_API_KEY` set) talks to `api.openai.com`. Your API key is forwarded untouched. +- **ChatGPT-login Codex** (signed in with `codex login`) talks to `chatgpt.com/backend-api/codex/responses` with an OAuth bearer token. The proxy leaves the `Authorization` header untouched and only masks/restores content, so your session keeps working. + +Both are intercepted with the same setup above; no extra configuration is needed to switch between them. + +## A shared snippet for both agents + +Drop this in your shell profile (`~/.zshrc` / `~/.bashrc`) to cover both agents at once on macOS: + +```bash +KIJI_CA="$HOME/Library/Application Support/Kiji Privacy Proxy/certs/ca.crt" +export HTTP_PROXY=http://127.0.0.1:8081 +export HTTPS_PROXY=http://127.0.0.1:8081 +export NODE_EXTRA_CA_CERTS="$KIJI_CA" # Claude Code (and Codex fallback) +export CODEX_CA_CERTIFICATE="$KIJI_CA" # Codex (explicit) +``` + +## What gets masked and restored + +| Direction | Covered | +|-----------|---------| +| Request → model | Chat `messages`; Responses-API `input` (string or message/part arrays), `instructions` (system prompt), tool-result `output`, and tool-call `arguments` | +| Model → response | Assistant text and tool-call `arguments`, for both **streaming** (SSE) and **buffered** replies | + +Every interception is recorded in the proxy's request log (visible in the desktop app), with the masked text the model actually saw and the restored text the agent received. See [Masking Controls & Review](08-masking-controls.md) to tune what gets masked and to review or delete recorded mappings. + +## Verifying interception + +1. **Check the proxy log.** Run a prompt in the agent, then open the desktop app's request log (or the standalone audit log). You should see an entry for the provider host with masked/restored content. +2. **Capture the raw stream (debugging).** Set `KIJI_SSE_CAPTURE_DIR` before starting the proxy to mirror each upstream SSE stream to a file — useful for confirming exactly what the agent received: + + ```bash + mkdir -p /tmp/agent-sse + KIJI_SSE_CAPTURE_DIR=/tmp/agent-sse + # run one agent request, then inspect: + grep -o '"type":"[^"]*"' /tmp/agent-sse/sse-*.log | sort -u + ``` + + Leave `KIJI_SSE_CAPTURE_DIR` unset in normal use; capture is off by default. + +## Troubleshooting + +**TLS / certificate error (`unable to get local issuer`, `invalid peer certificate`, handshake refused)** +- *Cause:* the agent doesn't trust the proxy CA. +- *Fix:* confirm the CA variable points at a file that exists and is readable. For Codex, use `CODEX_CA_CERTIFICATE` (a **file path**, not a directory); the macOS keychain alone won't satisfy rustls. For Claude Code, use `NODE_EXTRA_CA_CERTS`. Quote paths that contain spaces. + +**Traffic isn't being intercepted (no log entries)** +- *Cause:* the agent isn't using the proxy. +- *Fix:* ensure `HTTP_PROXY` and `HTTPS_PROXY` are exported in the **same shell/process** that runs the agent. Check that `NO_PROXY`/`no_proxy` doesn't list `openai.com`, `chatgpt.com`, or `anthropic.com`. Confirm the proxy is listening on the port you set. + +**ChatGPT-login Codex still fails after setup** +- *Cause:* the proxy build doesn't intercept `chatgpt.com`, or the CA isn't trusted on that host. +- *Fix:* verify `chatgpt.com` is in the proxy's intercept domains (it is added automatically when the OpenAI provider is configured) and that `CODEX_CA_CERTIFICATE` is set. As a last resort for diagnosis you can test with API-key Codex against `api.openai.com` to isolate whether the issue is host-specific. + +**Streaming feels stuck or arrives all at once** +- *Cause:* a buffering layer between the agent and the proxy. +- *Fix:* the proxy streams SSE through chunked and flushes per event. Make sure no additional proxy sits between the agent and Kiji, and that you point the agent directly at `127.0.0.1`. + +## Alternative: forward proxy without CA trust + +If you'd rather not install/trust the CA, agents that let you override the base URL can use the **forward proxy** (port `8080`) instead. The client talks plain HTTP to the proxy, which makes the upstream TLS connection itself — so no client-side CA trust is needed. + +```bash +# Claude Code → forward proxy (no CA needed) +export ANTHROPIC_BASE_URL=http://127.0.0.1:8080 +``` + +This works for API-key clients with a configurable endpoint. It does **not** work for **ChatGPT-login Codex**, whose `chatgpt.com` endpoint is fixed — that path requires the transparent proxy + CA trust described above. + +## See also + +- [Getting Started](01-getting-started.md) — installing the proxy and CA certificate +- [Advanced Topics](05-advanced-topics.md#transparent-proxy--mitm) — MITM architecture, CA management, CORS +- [Masking Controls & Review](08-masking-controls.md) — what gets masked, reviewing mappings diff --git a/docs/README.md b/docs/README.md index 88e35338..64a7c2eb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -146,6 +146,22 @@ Control what gets masked and review what already has been, from the desktop app. --- +### [Chapter 9: Coding Agents (Codex & Claude Code)](09-coding-agents.md) + +Route terminal coding agents through the proxy so PII in prompts, code, and tool calls is masked before it reaches the model and restored in replies. + +**Topics:** +- How agents are intercepted (hosts, masking, streaming restore) +- Claude Code setup (`HTTP_PROXY`/`HTTPS_PROXY`, `NODE_EXTRA_CA_CERTS`, `settings.json`) +- Codex setup (rustls CA trust via `CODEX_CA_CERTIFICATE`, API-key vs ChatGPT-login) +- A shared shell snippet for both agents +- Verifying interception and troubleshooting TLS/proxy issues +- Forward-proxy alternative without CA trust + +**Start here if you're:** Using OpenAI Codex or Claude Code and want their traffic masked by Kiji. + +--- + ## Quick Links ### Getting Started @@ -187,6 +203,12 @@ Control what gets masked and review what already has been, from the desktop app. - [Custom Regex Patterns](08-masking-controls.md#custom-regex-patterns) - [Review & Delete Mappings](08-masking-controls.md#reviewing-and-deleting-masked-entities) +### Coding Agents +- [Claude Code Setup](09-coding-agents.md#claude-code) +- [Codex Setup](09-coding-agents.md#codex) +- [Shared Shell Snippet](09-coding-agents.md#a-shared-snippet-for-both-agents) +- [Troubleshooting](09-coding-agents.md#troubleshooting) + ## Document Status These documents consolidate and supersede the following original files: From 55d6640a739fc0dd45125af7b1254e88b709a977 Mon Sep 17 00:00:00 2001 From: Hannes Hapke Date: Sat, 13 Jun 2026 10:59:17 -0700 Subject: [PATCH 4/4] linter fixes --- src/backend/proxy/codec_anthropic.go | 22 ++++++++++++++-------- src/backend/proxy/codec_openai.go | 6 +++--- src/backend/proxy/handler.go | 10 ++++++++-- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/backend/proxy/codec_anthropic.go b/src/backend/proxy/codec_anthropic.go index 8336fa75..2181c827 100644 --- a/src/backend/proxy/codec_anthropic.go +++ b/src/backend/proxy/codec_anthropic.go @@ -5,6 +5,12 @@ import ( "encoding/json" ) +// Anthropic content_block_delta payload types. +const ( + deltaTypeText = "text_delta" + deltaTypeInputJSON = "input_json_delta" +) + // anthropicCodec restores masked PII inside an Anthropic Messages-API SSE // stream. Placeholder (dummy) values can be split across consecutive // content_block_delta events, so it keeps a per-content-block carry buffer and @@ -57,12 +63,12 @@ func (s *anthropicCodec) transformEvent(lines [][]byte) []byte { } switch { - case evt.Type == "content_block_delta" && (evt.Delta.Type == "text_delta" || evt.Delta.Type == "input_json_delta"): + case evt.Type == "content_block_delta" && (evt.Delta.Type == deltaTypeText || evt.Delta.Type == deltaTypeInputJSON): // text_delta carries assistant text; input_json_delta carries a fragment // of a tool call's JSON arguments. PII can hide in either, and can be // split across consecutive deltas, so both go through the carry buffer. raw := evt.Delta.Text - if evt.Delta.Type == "input_json_delta" { + if evt.Delta.Type == deltaTypeInputJSON { raw = evt.Delta.PartialJSON } s.masked.WriteString(raw) // raw model output (text or tool args), for audit @@ -92,19 +98,19 @@ func (s *anthropicCodec) transformEvent(lines [][]byte) []byte { // deltaField returns the JSON field that carries the payload for a delta type. func deltaField(deltaType string) string { - if deltaType == "input_json_delta" { + if deltaType == deltaTypeInputJSON { return "partial_json" } - return "text" + return jsonKeyText } // deltaDataLine builds the `data: {...}\n` line for a text_delta or // input_json_delta carrying the given (restored) payload. func deltaDataLine(index int, deltaType, payload string) []byte { b, _ := json.Marshal(map[string]interface{}{ - "type": "content_block_delta", - "index": index, - "delta": map[string]interface{}{"type": deltaType, deltaField(deltaType): payload}, + jsonKeyType: "content_block_delta", + "index": index, + "delta": map[string]interface{}{jsonKeyType: deltaType, deltaField(deltaType): payload}, }) out := append([]byte("data: "), b...) return append(out, '\n') @@ -114,7 +120,7 @@ func deltaDataLine(index int, deltaType, payload string) []byte { // blank line) used to flush a held-back tail. Defaults to a text delta. func deltaEvent(index int, deltaType, payload string) []byte { if deltaType == "" { - deltaType = "text_delta" + deltaType = deltaTypeText } out := []byte("event: content_block_delta\n") out = append(out, deltaDataLine(index, deltaType, payload)...) diff --git a/src/backend/proxy/codec_openai.go b/src/backend/proxy/codec_openai.go index a85d0275..7696ebb3 100644 --- a/src/backend/proxy/codec_openai.go +++ b/src/backend/proxy/codec_openai.go @@ -37,7 +37,7 @@ func newOpenAICodec(mapping map[string]string) *openaiCodec { // doneFields are the fields a Responses `*.done` event uses to carry the full // value, by event kind: output text, tool-call arguments, and refusals. -var doneFields = []string{"text", "arguments", "refusal"} +var doneFields = []string{jsonKeyText, "arguments", "refusal"} func (c *openaiCodec) transformEvent(lines [][]byte) []byte { dataIdx := -1 @@ -58,7 +58,7 @@ func (c *openaiCodec) transformEvent(lines [][]byte) []byte { if err := json.Unmarshal(raw, &obj); err != nil { return concatLines(lines) // non-JSON payload (e.g. "[DONE]"): pass through } - typ, _ := obj["type"].(string) + typ, _ := obj[jsonKeyType].(string) key := channelKey(obj) switch { @@ -100,7 +100,7 @@ func (c *openaiCodec) transformEvent(lines [][]byte) []byte { // as-is (the carry buffer stores post-restore text). func (c *openaiCodec) tailDeltaEvent(doneType string, obj map[string]interface{}, tail string) []byte { deltaType := strings.TrimSuffix(doneType, ".done") + ".delta" - d := map[string]interface{}{"type": deltaType, "delta": tail} + d := map[string]interface{}{jsonKeyType: deltaType, "delta": tail} for _, f := range []string{"item_id", "output_index", "content_index"} { if v, ok := obj[f]; ok { d[f] = v diff --git a/src/backend/proxy/handler.go b/src/backend/proxy/handler.go index 51fa035f..475d3781 100644 --- a/src/backend/proxy/handler.go +++ b/src/backend/proxy/handler.go @@ -25,6 +25,12 @@ import ( const paramLimit = "limit" +// JSON field names reused across the response wrappers and the SSE codecs. +const ( + jsonKeyType = "type" + jsonKeyText = "text" +) + // Handler handles HTTP requests and proxies them to LLM provider type Handler struct { client *http.Client @@ -259,7 +265,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } piiEntities = append(piiEntities, map[string]interface{}{ - "text": entity.Text, + jsonKeyText: entity.Text, "masked_text": maskedText, "label": entity.Label, "confidence": entity.Confidence, @@ -505,7 +511,7 @@ func (h *Handler) LogStreamedResponse(ctx context.Context, transactionID, masked wrap := func(text string) string { b, err := json.Marshal(map[string]interface{}{ "streamed": true, - "content": []map[string]interface{}{{"type": "text", "text": text}}, + "content": []map[string]interface{}{{jsonKeyType: jsonKeyText, jsonKeyText: text}}, }) if err != nil { return text