Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion apps/codex-plus-launcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,50 @@ async fn main() -> Result<()> {
});
let hooks = LauncherHooks::default();
let handle = launch_and_inject_with_hooks(options, &hooks).await?;
handle.wait_for_codex_exit().await?;
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"main.wait_for_codex_exit_start",
json!({
"launch_type": format!("{:?}", handle.launch),
"debug_port": handle.debug_port,
"helper_port": handle.helper_port,
}),
);
// FIX: wait_for_codex_exit 可能因权限或进程模型问题快速返回(如 Windows Store 回退时
// Codex.exe 作为 broker 启动后自身退出,或 OpenProcess 权限不足),
// 此时不应让 main 退出,否则 Helper 被 shutdown,Codex++ 后端断开。
// 改为等待 Ctrl+C 信号保持程序存活。
let exit_result = handle.wait_for_codex_exit().await;
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"main.wait_for_codex_exit_result",
json!({
"ok": exit_result.is_ok(),
"error": exit_result.as_ref().err().map(|e| e.to_string()),
}),
);
// FIX: wait_for_codex_exit 内部会调用 shutdown_helper 关闭 HTTP 服务器,
// 但 Codex 可能还在运行(broker 退出导致 wait 返回),所以重新启动 Helper
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let _ = hooks.start_helper(handle.helper_port).await;
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"main.helper_restarted",
json!({"helper_port": handle.helper_port, "debug_port": handle.debug_port}),
);
// 循环检测 CDP 存活,持续等待直到 CDP 断开(Codex 真正退出)
loop {
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
if !is_cdp_alive(handle.debug_port).await {
// CDP 不可达,等待片刻再确认(防止误判)
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
if !is_cdp_alive(handle.debug_port).await {
// 两次检测都不可达,说明 Codex 真正退出了
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"main.codex_exit_confirmed",
json!({"debug_port": handle.debug_port}),
);
break;
}
}
}
Ok(())
}

Expand Down Expand Up @@ -675,6 +718,11 @@ fn default_user_script_manager() -> UserScriptManager {
)
}

/// 检查 Codex 的 CDP 端口是否可达(用于判断 Codex 是否仍在运行)
async fn is_cdp_alive(debug_port: u16) -> bool {
codex_plus_core::cdp::list_targets(debug_port).await.is_ok()
}

fn default_user_scripts_config_dir() -> PathBuf {
if cfg!(windows) {
if let Some(roaming) = std::env::var_os("APPDATA") {
Expand Down
10 changes: 4 additions & 6 deletions crates/codex-plus-core/src/app_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,10 @@ pub fn packaged_app_user_model_id(app_dir: &Path) -> Option<String> {
if !package_name.starts_with("OpenAI.Codex_") || !package_name.contains("__") {
return None;
}
let identity_name = package_name.split_once('_')?.0;
let publisher_id = package_name.rsplit_once("__")?.1;
if publisher_id.is_empty() {
return None;
}
Some(format!("{identity_name}_{publisher_id}!App"))
// FIX: MSIX 包的 AUMID 格式就是 PackageFullName!App
// 之前错误地只取了 identity_name + publisher_id,丢失了版本号部分
// 例如:OpenAI.Codex_26.601.2237.0_x64__2p2nqsd0c76g0!App
Some(format!("{}!App", package_name))
}

fn package_name_from_app_dir(app_dir: &Path) -> Option<String> {
Expand Down
17 changes: 16 additions & 1 deletion crates/codex-plus-core/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,20 @@ pub fn proxied_client(user_agent: &str) -> anyhow::Result<reqwest::Client> {
} else {
user_agent.trim().to_string()
};
Ok(reqwest::Client::builder().user_agent(ua).build()?)
// FIX: 为协议代理的流式请求配置合理的超时和连接池
// - 连接超时 30s,防止慢连接卡死
// - 读写超时 300s (5分钟),流式 SSE 响应可能很长,避免中途断开
// - 启用 TCP keepalive,防止防火墙/代理主动断开长连接
// - 增加连接池大小,支持并发请求
//
// 注意:reqwest 0.12 不支持 http2_keep_alive_* 配置,这些在 hyper 0.x 中才有。
// 对于 HTTP/1.1 流式响应,tcp_keepalive + 大超时已经足够。
Ok(reqwest::Client::builder()
.user_agent(ua)
.connect_timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(300))
.tcp_keepalive(std::time::Duration::from_secs(60))
.pool_idle_timeout(std::time::Duration::from_secs(120))
.pool_max_idle_per_host(32)
.build()?)
}
123 changes: 107 additions & 16 deletions crates/codex-plus-core/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,19 +457,28 @@ impl LaunchHooks for DefaultLaunchHooks {
else {
unreachable!();
};
let process_id = activate_packaged_app(app_user_model_id, arguments).await?;
return Ok(match activation {
CodexLaunch::PackagedActivation {
app_user_model_id,
arguments,
..
} => CodexLaunch::PackagedActivation {
app_user_model_id,
arguments,
process_id: Some(process_id),
},
CodexLaunch::Process { .. } => unreachable!(),
});
match activate_packaged_app(app_user_model_id, arguments).await {
Ok(process_id) => return Ok(match activation {
CodexLaunch::PackagedActivation {
app_user_model_id,
arguments,
..
} => CodexLaunch::PackagedActivation {
app_user_model_id,
arguments,
process_id: Some(process_id),
},
CodexLaunch::Process { .. } => unreachable!(),
}),
Err(e) => {
// FIX: AUMID 激活失败时回退到直接执行 Codex.exe
// 适用于 Windows Store / MSIX 包版本(如 OpenAI.Codex_26.601.2237.0_x64__2p2nqsd0c76g0)
eprintln!(
"[WARN] Packaged activation failed for AUMID {}: {}, falling back to direct execution",
app_user_model_id, e
);
}
}
}
}

Expand Down Expand Up @@ -647,28 +656,62 @@ async fn handle_helper_connection(
}),
);

// FIX: 所有代理处理函数中的 `?` 错误可能传播导致连接被粗暴关闭
// 使用 catch-all 确保任何内部错误都写入 HTTP 错误响应再关闭连接
if crate::protocol_proxy::is_responses_proxy_path(path) && method == "POST" {
return handle_protocol_proxy_connection(
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.proxy_route",
serde_json::json!({"path": path, "method": method, "route": "responses_proxy"}),
);
let result = handle_protocol_proxy_connection(
&mut stream,
request_body,
method,
path,
remote_addr_text,
)
.await;
if let Err(error) = result {
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.proxy_catch_error",
serde_json::json!({"path": path, "method": method, "error": error.to_string(), "route": "responses_proxy"}),
);
write_error_response_and_shutdown(&mut stream, "500 Internal Server Error", &error.to_string()).await;
}
return Ok(());
}
if crate::protocol_proxy::is_chat_completions_proxy_path(path) && method == "POST" {
return handle_chat_completions_proxy_connection(
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.proxy_route",
serde_json::json!({"path": path, "method": method, "route": "chat_completions_proxy"}),
);
let result = handle_chat_completions_proxy_connection(
&mut stream,
request_body,
method,
path,
remote_addr_text,
)
.await;
if let Err(error) = result {
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.proxy_catch_error",
serde_json::json!({"path": path, "method": method, "error": error.to_string(), "route": "chat_completions_proxy"}),
);
write_error_response_and_shutdown(&mut stream, "500 Internal Server Error", &error.to_string()).await;
}
return Ok(());
}
if crate::protocol_proxy::is_models_proxy_path(path) && matches!(method, "GET" | "OPTIONS") {
return handle_models_proxy_connection(&mut stream, method, path, remote_addr_text).await;
let result = handle_models_proxy_connection(&mut stream, method, path, remote_addr_text).await;
if let Err(error) = result {
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.proxy_catch_error",
serde_json::json!({"path": path, "method": method, "error": error.to_string(), "route": "models_proxy"}),
);
write_error_response_and_shutdown(&mut stream, "500 Internal Server Error", &error.to_string()).await;
}
return Ok(());
}

let (status, body, content_type, log_event) =
Expand Down Expand Up @@ -830,6 +873,34 @@ async fn handle_protocol_proxy_connection(
method: &str,
path: &str,
remote_addr_text: Option<String>,
) -> anyhow::Result<()> {
// FIX: 包装整个处理逻辑,确保所有错误都被捕获并写入 HTTP 错误响应
// 之前大量 `?` 运算符导致错误直接传播到 tokio task,TCP 连接被粗暴关闭
// Codex 客户端看到的就是 "stream disconnected before completion"
let result = handle_protocol_proxy_connection_inner(
stream,
request_body,
method,
path,
remote_addr_text,
)
.await;
if let Err(error) = result {
let _ = crate::diagnostic_log::append_diagnostic_log(
"helper.protocol_proxy_inner_error",
serde_json::json!({"method": method, "path": path, "error": error.to_string()}),
);
write_error_response_and_shutdown(stream, "500 Internal Server Error", &error.to_string()).await;
}
Ok(())
}

async fn handle_protocol_proxy_connection_inner(
stream: &mut tokio::net::TcpStream,
request_body: &str,
method: &str,
path: &str,
remote_addr_text: Option<String>,
) -> anyhow::Result<()> {
let request_json = serde_json::from_str::<serde_json::Value>(request_body).ok();
let upstream = match crate::protocol_proxy::open_responses_proxy_request(request_body).await {
Expand Down Expand Up @@ -924,6 +995,7 @@ async fn handle_protocol_proxy_connection(
"200 OK",
remote_addr_text,
);
// FIX: 流式响应已写入 [DONE]\n\n 标记,此时 shutdown() 让客户端感知到 EOF
stream.shutdown().await?;
return Ok(());
}
Expand Down Expand Up @@ -1003,6 +1075,7 @@ async fn handle_chat_completions_proxy_connection(
&status,
remote_addr_text,
);
// FIX: 流式响应已完整写入,shutdown() 让客户端感知到 EOF
stream.shutdown().await?;
return Ok(());
}
Expand Down Expand Up @@ -1051,6 +1124,24 @@ async fn write_http_stream_headers(
Ok(())
}

/// 写入 HTTP 错误响应并关闭连接,忽略所有错误(防止错误传播导致连接被粗暴关闭)
async fn write_error_response_and_shutdown(stream: &mut tokio::net::TcpStream, status: &str, message: &str) {
let body = serde_json::to_vec(&serde_json::json!({
"status": "failed",
"message": message
}))
.unwrap_or_default();
let response = format!(
"HTTP/1.1 {status}\r\nContent-Type: application/json; charset=utf-8\r\nAccess-Control-Allow-Origin: *\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
let _ = stream.write_all(response.as_bytes()).await;
if !body.is_empty() {
let _ = stream.write_all(&body).await;
}
let _ = stream.shutdown().await;
}

fn log_helper_response(
event: &str,
method: &str,
Expand Down
49 changes: 42 additions & 7 deletions crates/codex-plus-core/src/protocol_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ impl CodexToolContext {
}
}

/// 获取 relay 的上游 Base URL,使用 upstream_base_url 作为 base_url 的 fallback
/// 配置文件可能只设置了 upstreamBaseUrl 而未设置 baseUrl,反序列化后 base_url 为空
fn relay_base_url(relay: &crate::settings::RelayProfile) -> &str {
let url = relay.base_url.trim();
if url.is_empty() {
relay.upstream_base_url.trim()
} else {
url
}
}

pub fn local_responses_proxy_base_url(port: u16) -> String {
format!("http://127.0.0.1:{port}/v1")
}
Expand Down Expand Up @@ -426,10 +437,29 @@ pub fn is_models_proxy_path(path: &str) -> bool {
pub async fn open_responses_proxy_request(body: &str) -> anyhow::Result<UpstreamProxyResponse> {
let settings = SettingsStore::default().load().unwrap_or_default();
let relay = settings.active_relay_profile();
let _ = crate::diagnostic_log::append_diagnostic_log(
"proto.open_responses_proxy_state",
serde_json::json!({
"proto": format!("{:?}", relay.protocol),
"base_url": relay.base_url,
"upstream_base_url": relay.upstream_base_url,
"api_key_empty": relay.api_key.trim().is_empty(),
"active_relay_id": settings.active_relay_id,
"relay_profiles_count": settings.relay_profiles.len(),
}),
);
if relay.protocol != RelayProtocol::ChatCompletions {
anyhow::bail!("当前中转未启用 Chat Completions 协议代理");
}
if relay.base_url.trim().is_empty() {
// FIX: 使用 upstream_base_url 作为 base_url 的 fallback
// agnes-ai 等配置中只设置了 upstreamBaseUrl,没有设置 baseUrl
// 反序列化时 base_url 为空,导致协议代理无法获取上游地址
let base_url = relay_base_url(&relay);
let _ = crate::diagnostic_log::append_diagnostic_log(
"proto.resolved_base_url",
serde_json::json!({"base_url": base_url, "chat_completions_url": crate::protocol_proxy::chat_completions_url(base_url)}),
);
if base_url.trim().is_empty() {
anyhow::bail!("Chat Completions 上游 Base URL 不能为空");
}
if relay.api_key.trim().is_empty() {
Expand All @@ -444,7 +474,7 @@ pub async fn open_responses_proxy_request(body: &str) -> anyhow::Result<Upstream
let chat_request = responses_to_chat_completions(request_json.clone())?;
let client = crate::http_client::proxied_client(&relay.user_agent)?;
let upstream = client
.post(chat_completions_url(&relay.base_url))
.post(chat_completions_url(&base_url))
.bearer_auth(relay.api_key.trim())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&chat_request)
Expand Down Expand Up @@ -472,7 +502,8 @@ pub async fn open_models_proxy_request() -> anyhow::Result<UpstreamProxyResponse
if relay.protocol != RelayProtocol::ChatCompletions {
anyhow::bail!("当前中转未启用 Chat Completions 协议代理");
}
if relay.base_url.trim().is_empty() {
let base_url = relay_base_url(&relay);
if base_url.trim().is_empty() {
anyhow::bail!("Chat Completions 上游 Base URL 不能为空");
}
if relay.api_key.trim().is_empty() {
Expand All @@ -481,7 +512,7 @@ pub async fn open_models_proxy_request() -> anyhow::Result<UpstreamProxyResponse

let client = crate::http_client::proxied_client(&relay.user_agent)?;
let upstream = client
.get(models_url(&relay.base_url))
.get(models_url(&base_url))
.bearer_auth(relay.api_key.trim())
.send()
.await?;
Expand Down Expand Up @@ -509,7 +540,8 @@ pub async fn open_chat_completions_proxy_request(
if relay.protocol != RelayProtocol::ChatCompletions {
anyhow::bail!("当前中转未启用 Chat Completions 协议代理");
}
if relay.base_url.trim().is_empty() {
let base_url = relay_base_url(&relay);
if base_url.trim().is_empty() {
anyhow::bail!("Chat Completions 上游 Base URL 不能为空");
}
if relay.api_key.trim().is_empty() {
Expand All @@ -521,8 +553,11 @@ pub async fn open_chat_completions_proxy_request(
.get("stream")
.and_then(Value::as_bool)
.unwrap_or(false);
let upstream = reqwest::Client::new()
.post(chat_completions_url(&relay.base_url))
// FIX: 使用 proxied_client 而非裸 reqwest::Client::new(),确保有超时和连接池配置
// 旧代码使用 reqwest::Client::new() 导致没有超时配置,流式响应容易断开
let client = crate::http_client::proxied_client(&relay.user_agent)?;
let upstream = client
.post(chat_completions_url(&base_url))
.bearer_auth(relay.api_key.trim())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.json(&request_json)
Expand Down
Loading