-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
feat(codex-encryp-cont-retry): add a feature in api error handling #2581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -416,6 +416,45 @@ pub async fn handle_chat_completions( | |
| process_response(response, &ctx, &state, &OPENAI_PARSER_CONFIG).await | ||
| } | ||
|
|
||
| /// Remove all encrypted_content from the request body. | ||
| /// | ||
| /// Two places must be cleaned: | ||
| /// 1. `include` array — prevents the API from returning NEW encrypted_content | ||
| /// 2. `input` array (reasoning/compaction items) — removes OLD encrypted_content | ||
| /// that was stored in Codex session files from previous turns and can no | ||
| /// longer be decrypted after the proxy rotated API keys. | ||
| /// | ||
| /// Returns `true` if anything was actually stripped. | ||
| fn strip_all_encrypted_content(body: &mut Value) -> bool { | ||
| let mut stripped = false; | ||
|
|
||
| // 1. Prevent new encrypted_content from being returned. | ||
| if let Some(include) = body.get_mut("include").and_then(|v| v.as_array_mut()) { | ||
| let before = include.len(); | ||
| include.retain(|v| v.as_str() != Some("reasoning.encrypted_content")); | ||
| stripped |= before != include.len(); | ||
| } | ||
|
|
||
| // 2. Remove old encrypted_content from input items so the API | ||
| // doesn't try (and fail) to decrypt blobs from a different key. | ||
| if let Some(input) = body.get_mut("input").and_then(|v| v.as_array_mut()) { | ||
| for item in input.iter_mut() { | ||
| if let Some(obj) = item.as_object_mut() { | ||
| match obj.get("type").and_then(|v| v.as_str()) { | ||
| Some("reasoning") | Some("compaction") => { | ||
| if obj.remove("encrypted_content").is_some() { | ||
| stripped = true; | ||
| } | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| stripped | ||
| } | ||
|
|
||
| /// 处理 /v1/responses 请求(OpenAI Responses API - Codex CLI 透传) | ||
| pub async fn handle_responses( | ||
| State(state): State<ProxyState>, | ||
|
|
@@ -430,9 +469,27 @@ pub async fn handle_responses( | |
| .await | ||
| .map_err(|e| ProxyError::Internal(format!("Failed to read request body: {e}")))? | ||
| .to_bytes(); | ||
| let body: Value = serde_json::from_slice(&body_bytes) | ||
| let mut body: Value = serde_json::from_slice(&body_bytes) | ||
| .map_err(|e| ProxyError::Internal(format!("Failed to parse request body: {e}")))?; | ||
|
|
||
| let smart_enabled = state | ||
| .db | ||
| .get_setting("strip_encrypted_content_enabled") | ||
| .unwrap_or(None) | ||
| .map(|v| v != "false" && v != "0") | ||
| .unwrap_or(true); | ||
|
|
||
| // Keep a clone for potential retry with stripping after error detection. | ||
| let original_body = body.clone(); | ||
|
|
||
| // Proactive strip: if any provider has a recent encrypted_content error, | ||
| // strip ahead of time to avoid hitting the error at all. | ||
| let proactively_stripped = if smart_enabled && state.has_recent_encrypted_error() { | ||
| strip_all_encrypted_content(&mut body) | ||
| } else { | ||
| false | ||
| }; | ||
|
|
||
| let mut ctx = | ||
| RequestContext::new(&state, &body, &headers, AppType::Codex, "Codex", "codex").await?; | ||
| let endpoint = endpoint_with_query(&uri, "/responses"); | ||
|
|
@@ -443,31 +500,228 @@ pub async fn handle_responses( | |
| .unwrap_or(false); | ||
|
|
||
| let forwarder = ctx.create_forwarder(&state); | ||
| let result = match forwarder | ||
| let (fwd_response, provider) = match forwarder | ||
| .forward_with_retry( | ||
| &AppType::Codex, | ||
| &endpoint, | ||
| body, | ||
| headers, | ||
| extensions, | ||
| headers.clone(), | ||
| extensions.clone(), | ||
| ctx.get_providers(), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(result) => result, | ||
| Ok(result) => (result.response, result.provider), | ||
| Err(mut err) => { | ||
| if let Some(provider) = err.provider.take() { | ||
| ctx.provider = provider; | ||
| let provider = err.provider.clone(); | ||
| if let Some(ref p) = provider { | ||
| ctx.provider = p.clone(); | ||
| } | ||
|
|
||
| // Reactive detection from forwarder error: forward_with_retry | ||
| // converts non-2xx responses to Err(UpstreamError), so the Ok-branch | ||
| // reactive check below never fires. Detect here instead. | ||
| if smart_enabled && !proactively_stripped { | ||
| let is_encrypted_error = matches!( | ||
| &err.error, | ||
| ProxyError::UpstreamError { body: Some(b), .. } | ||
| if b.contains("invalid_encrypted_content") | ||
| ); | ||
| if is_encrypted_error { | ||
| if let Some(ref p) = provider { | ||
| state.record_encrypted_error(&p.id); | ||
| } | ||
| log::warn!( | ||
| "[Codex] 检测到 invalid_encrypted_content (provider={}), 剥离后重试", | ||
| provider.as_ref().map(|p| p.id.as_str()).unwrap_or("unknown") | ||
| ); | ||
|
|
||
| let mut retry_body = original_body; | ||
| strip_all_encrypted_content(&mut retry_body); | ||
|
|
||
| let retry_ctx = RequestContext::new( | ||
| &state, &retry_body, &headers, AppType::Codex, "Codex", "codex", | ||
| ) | ||
| .await?; | ||
| let retry_forwarder = retry_ctx.create_forwarder(&state); | ||
|
|
||
| match retry_forwarder | ||
| .forward_with_retry( | ||
| &AppType::Codex, | ||
| &endpoint, | ||
| retry_body, | ||
| headers, | ||
| extensions, | ||
| retry_ctx.get_providers(), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(retry_result) => { | ||
| log::info!( | ||
| "[Codex] 剥离 encrypted_content 后重试成功 (provider={})", | ||
| retry_result.provider.id | ||
| ); | ||
| ctx.provider = retry_result.provider; | ||
| return process_response( | ||
| retry_result.response, | ||
| &retry_ctx, | ||
| &state, | ||
| &CODEX_PARSER_CONFIG, | ||
| ) | ||
| .await; | ||
| } | ||
| Err(retry_err) => { | ||
| log::error!( | ||
| "[Codex] 剥离 encrypted_content 后重试仍失败: {}", | ||
| retry_err.error | ||
| ); | ||
| let wrapped = ProxyError::Internal(format!( | ||
| "[CC Switch] Auto-detected invalid_encrypted_content, stripped encrypted_content from request (include + input items), and retried — but the retry also failed.\nOriginal error: {}\n\nTip: If you see this repeatedly, try restarting your Codex session with --resume to clear session state.", | ||
| retry_err.error | ||
| )); | ||
| log_forward_error(&state, &ctx, is_stream, &wrapped); | ||
| return Err(wrapped); | ||
|
Comment on lines
+578
to
+583
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If the strip-and-retry attempt fails, this path wraps the failure into Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| log_forward_error(&state, &ctx, is_stream, &err.error); | ||
| return Err(err.error); | ||
| } | ||
| }; | ||
|
|
||
| ctx.provider = result.provider; | ||
| let response = result.response; | ||
| // Reactive detection: if smart mode is on, we didn't already strip, and the | ||
| // response is a non-SSE error — check whether it's invalid_encrypted_content. | ||
| // Note: most error responses are converted to Err by forward(), so this branch | ||
| // is rarely reached. The primary detection lives in the Err branch above. | ||
| let error_status = fwd_response.status(); | ||
| if smart_enabled && !proactively_stripped && !error_status.is_success() && !fwd_response.is_sse() { | ||
| let error_headers = fwd_response.headers().clone(); | ||
| let error_bytes = fwd_response.bytes().await?; | ||
| let error_str = String::from_utf8_lossy(&error_bytes); | ||
|
|
||
| if error_str.contains("invalid_encrypted_content") { | ||
| state.record_encrypted_error(&provider.id); | ||
| log::warn!( | ||
| "[Codex] 检测到 invalid_encrypted_content (provider={}), 剥离后重试", | ||
| provider.id | ||
| ); | ||
|
|
||
| let mut retry_body = original_body; | ||
| strip_all_encrypted_content(&mut retry_body); | ||
|
|
||
| let retry_ctx = RequestContext::new( | ||
| &state, &retry_body, &headers, AppType::Codex, "Codex", "codex", | ||
| ) | ||
| .await?; | ||
| let retry_forwarder = retry_ctx.create_forwarder(&state); | ||
|
|
||
| match retry_forwarder | ||
| .forward_with_retry( | ||
| &AppType::Codex, | ||
| &endpoint, | ||
| retry_body, | ||
| headers, | ||
| extensions, | ||
| retry_ctx.get_providers(), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(retry_result) => { | ||
| let retry_response = retry_result.response; | ||
| if !retry_response.status().is_success() && !retry_response.is_sse() { | ||
| let hdrs = retry_response.headers().clone(); | ||
| let st = retry_response.status(); | ||
| let body_bytes = retry_response.bytes().await?; | ||
| let mut body_json: Value = | ||
| serde_json::from_slice(&body_bytes).unwrap_or_default(); | ||
| if let Some(obj) = body_json.as_object_mut() { | ||
| if let Some(error) = obj.get_mut("error").and_then(|v| v.as_object_mut()) | ||
| { | ||
| if let Some(msg) = | ||
| error.get("message").and_then(|v| v.as_str()).map(|s| s.to_string()) | ||
| { | ||
| error.insert( | ||
| "message".to_string(), | ||
| Value::String(format!( | ||
| "[CC Switch] Auto-detected invalid_encrypted_content, stripped encrypted_content from include + input items, and retried — but the retry also failed.\nOriginal API error: {msg}" | ||
| )), | ||
| ); | ||
| } | ||
| error.insert( | ||
| "cc_switch_note".to_string(), | ||
| Value::String( | ||
| "encrypted_content was automatically stripped from this request by CC Switch. If this error persists, try restarting your Codex session with --resume, or disable this feature in proxy settings." | ||
| .to_string(), | ||
| ), | ||
| ); | ||
| } | ||
| } | ||
| let body_str = serde_json::to_string(&body_json).unwrap_or_default(); | ||
| let mut axum_resp = axum::response::Response::builder().status(st); | ||
| if let Some(h) = axum_resp.headers_mut() { | ||
| for (k, v) in hdrs.iter() { | ||
| h.insert(k, v.clone()); | ||
| } | ||
| } | ||
| log::warn!( | ||
| "[Codex] 剥离后重试仍返回错误 (provider={}): {}", | ||
| retry_result.provider.id, | ||
| body_str | ||
| ); | ||
| return Ok(axum_resp | ||
| .body(axum::body::Body::from(body_str)) | ||
| .unwrap()); | ||
| } | ||
|
|
||
| log::info!( | ||
| "[Codex] 剥离 encrypted_content 后重试成功 (provider={})", | ||
| retry_result.provider.id | ||
| ); | ||
| ctx.provider = retry_result.provider; | ||
| return process_response( | ||
| retry_response, | ||
| &retry_ctx, | ||
| &state, | ||
| &CODEX_PARSER_CONFIG, | ||
| ) | ||
| .await; | ||
| } | ||
| Err(mut err) => { | ||
| if let Some(p) = err.provider.take() { | ||
| ctx.provider = p; | ||
| } | ||
| log::error!( | ||
| "[Codex] 剥离 encrypted_content 后重试仍失败: {}", | ||
| err.error | ||
| ); | ||
| let wrapped = ProxyError::Internal(format!( | ||
| "[CC Switch] Auto-detected invalid_encrypted_content, stripped encrypted_content from request (include + input items), and retried — but the retry also failed.\nOriginal error: {}\n\nTip: If you see this repeatedly, try restarting your Codex session with --resume to clear session state.", | ||
| err.error | ||
| )); | ||
| log_forward_error(&state, &ctx, is_stream, &wrapped); | ||
| return Err(wrapped); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| process_response(response, &ctx, &state, &CODEX_PARSER_CONFIG).await | ||
| // Not an encrypted_content error — rebuild the response and return as-is. | ||
| let mut axum_resp = axum::response::Response::builder() | ||
| .status(error_status); | ||
| { | ||
| let h = axum_resp.headers_mut().unwrap(); | ||
| for (k, v) in error_headers.iter() { | ||
| h.insert(k, v.clone()); | ||
| } | ||
| } | ||
| return Ok(axum_resp | ||
| .body(axum::body::Body::from(error_bytes.to_vec())) | ||
| .unwrap()); | ||
| } | ||
|
|
||
| ctx.provider = provider; | ||
| process_response(fwd_response, &ctx, &state, &CODEX_PARSER_CONFIG).await | ||
| } | ||
|
|
||
| /// 处理 /v1/responses/compact 请求(OpenAI Responses Compact API - Codex CLI 透传) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On successful retry, the code assigns
ctx.provider = retry_result.providerbut then callsprocess_responsewith&retry_ctx, whoseproviderwas never updated. In this handler,process_responseusesctx.providerfor usage/accounting and provider attribution, so failover retries can be logged against the wrong provider whenever the retry lands on a different backend.Useful? React with 👍 / 👎.