From 8ce45485692ffe6cbea98e12b7036a9396e4788f Mon Sep 17 00:00:00 2001 From: zackees Date: Mon, 29 Jun 2026 12:37:47 -0700 Subject: [PATCH] fix(async): close audit gaps from #813 sub-issues #815/#817/#818 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three parallel fix-ups completing the whole-app tokio runtime migration audit: #815 fbuild-daemon — emulator handler fs/wraps: - avr8js_deploy.rs: 5 std::fs::* calls → tokio::fs::*.await - avr8js_web.rs: 2 std::fs::read_to_string → tokio::fs::*.await; load_session_manifest flipped sync → async; in-file callers updated - qemu_deploy.rs: std::fs::create_dir_all → tokio::fs::*.await - avr8js_npm.rs: ensure_avr8js_npm_in inner create_dir_all converted; prepare_avr8js_cache_for_install kept sync (test-callable) and its sole async caller wraps it in spawn_blocking - export_artifacts_bundle (3 call sites in build.rs + deploy.rs) wrapped in spawn_blocking so its sync std::fs::* I/O doesn't run on the axum worker #817 fbuild-python — 7 sync helpers folded into block_on(async_version): - serial_monitor.rs: reset_device now reuses post_reset_request_async - daemon.rs: verify_broker_daemon_cache_identity, ensure_running_via_broker, ensure_running, stop, status all share async helpers - outcome.rs: send_op now folds into send_op_async - ~30-line duplicated sync HTTP blocks collapsed to 1-2 lines each; no more reqwest::blocking::* in fbuild-python/src/ #818 cross-cutting — BuildLog channel flipped to tokio: - BuildLog.sender / BuildParams.log_sender: Option> → Option> - daemon build handler bridge task (sync-mpsc → tokio-mpsc forwarder on spawn_blocking) deleted; replaced with a direct tokio::spawn forwarder that awaits log_rx.recv() - Test fixtures updated to use tokio::sync::mpsc::unbounded_channel Closes #815, #816, #817, #818, #819, #820, #821 (all sub-issues of #813 confirmed addressed; #816/#819/#820/#821 had no remaining work by verification, #815/#817/#818 fixed here). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/fbuild-build/src/build_output.rs | 8 +- crates/fbuild-build/src/lib.rs | 8 +- crates/fbuild-core/src/build_log.rs | 19 +- .../src/handlers/emulator/avr8js_deploy.rs | 14 +- .../src/handlers/emulator/avr8js_npm.rs | 20 +- .../src/handlers/emulator/avr8js_web.rs | 12 +- .../src/handlers/emulator/qemu_deploy.rs | 2 +- .../src/handlers/operations/build.rs | 90 +++-- .../src/handlers/operations/deploy.rs | 53 ++- .../fbuild-python/src/async_serial_monitor.rs | 76 ++-- crates/fbuild-python/src/daemon.rs | 348 +++++++----------- crates/fbuild-python/src/outcome.rs | 48 +-- crates/fbuild-python/src/serial_monitor.rs | 60 ++- 13 files changed, 373 insertions(+), 385 deletions(-) diff --git a/crates/fbuild-build/src/build_output.rs b/crates/fbuild-build/src/build_output.rs index 75d1e5c3..f6fd12fa 100644 --- a/crates/fbuild-build/src/build_output.rs +++ b/crates/fbuild-build/src/build_output.rs @@ -10,7 +10,9 @@ use std::time::Instant; use fbuild_core::{BuildLog, MemoryRegion, SizeInfo, SymbolMap}; /// Create a [`BuildLog`], optionally wired to a real-time streaming sender. -pub fn create_build_log(sender: Option>) -> BuildLog { +pub fn create_build_log( + sender: Option>, +) -> BuildLog { match sender { Some(s) => BuildLog::with_sender(s), None => BuildLog::new(), @@ -19,7 +21,7 @@ pub fn create_build_log(sender: Option>) -> Buil /// Create a [`BuildLog`] with elapsed-time prefixes from the given epoch. pub fn create_build_log_with_epoch( - sender: Option>, + sender: Option>, epoch: Instant, ) -> BuildLog { match sender { @@ -327,7 +329,7 @@ mod tests { #[test] fn test_create_build_log_with_sender() { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let mut log = create_build_log(Some(tx)); log.push("test"); assert_eq!(rx.try_recv().unwrap(), "test"); diff --git a/crates/fbuild-build/src/lib.rs b/crates/fbuild-build/src/lib.rs index 221fc1ab..f623cfe6 100644 --- a/crates/fbuild-build/src/lib.rs +++ b/crates/fbuild-build/src/lib.rs @@ -165,7 +165,13 @@ pub struct BuildParams { /// Used by IWYU and clang-tidy to avoid building framework core files. pub compiledb_only: bool, /// Optional sender for streaming build log lines in real-time. - pub log_sender: Option>, + /// + /// Uses `tokio::sync::mpsc::UnboundedSender` so the orchestrator (running + /// on a tokio runtime) and the WebSocket forwarder can share one channel + /// without a sync→async bridge — `UnboundedSender::send` is sync and safe + /// to call from blocking code, while the receive side is awaited from the + /// async daemon handler (fbuild#818). + pub log_sender: Option>, /// When true, run symbol-level memory analysis after linking. pub symbol_analysis: bool, /// Optional path to write the symbol analysis report to. diff --git a/crates/fbuild-core/src/build_log.rs b/crates/fbuild-core/src/build_log.rs index 26e6cb26..d2b425e1 100644 --- a/crates/fbuild-core/src/build_log.rs +++ b/crates/fbuild-core/src/build_log.rs @@ -8,17 +8,22 @@ //! the elapsed time since that epoch (e.g. `" 0.46 compiling foo.cpp"`). use std::collections::VecDeque; -use std::sync::mpsc::Sender; use std::time::Instant; +use tokio::sync::mpsc::UnboundedSender; /// Centralized build output log. /// /// Accumulates build output lines (compilation steps, warnings, size info, etc.) /// and optionally streams each line through a channel for real-time delivery /// from the daemon to the CLI. +/// +/// The streaming sender is a `tokio::sync::mpsc::UnboundedSender` so that +/// push-from-sync-code and recv-from-async-code share one channel without an +/// intermediate `spawn_blocking` bridge — `UnboundedSender::send` is sync and +/// callable from any context (see fbuild#818 async-audit follow-up). pub struct BuildLog { lines: VecDeque, - sender: Option>, + sender: Option>, epoch: Option, } @@ -33,7 +38,7 @@ impl BuildLog { } /// Create a log that streams each line through the given sender (no timestamps). - pub fn with_sender(sender: Sender) -> Self { + pub fn with_sender(sender: UnboundedSender) -> Self { Self { lines: VecDeque::new(), sender: Some(sender), @@ -51,7 +56,7 @@ impl BuildLog { } /// Create a log that streams each line and prefixes with elapsed time. - pub fn with_sender_and_epoch(sender: Sender, epoch: Instant) -> Self { + pub fn with_sender_and_epoch(sender: UnboundedSender, epoch: Instant) -> Self { Self { lines: VecDeque::new(), sender: Some(sender), @@ -116,7 +121,7 @@ mod tests { #[test] fn streams_through_sender() { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let mut log = BuildLog::with_sender(tx); log.push("hello"); log.push("world"); @@ -129,7 +134,7 @@ mod tests { #[test] fn sender_dropped_does_not_panic() { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); drop(rx); let mut log = BuildLog::with_sender(tx); // Should not panic even though receiver is gone @@ -160,7 +165,7 @@ mod tests { #[test] fn with_sender_and_epoch_streams_prefixed() { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let epoch = Instant::now(); let mut log = BuildLog::with_sender_and_epoch(tx, epoch); log.push("test"); diff --git a/crates/fbuild-daemon/src/handlers/emulator/avr8js_deploy.rs b/crates/fbuild-daemon/src/handlers/emulator/avr8js_deploy.rs index 62052620..dd4ed905 100644 --- a/crates/fbuild-daemon/src/handlers/emulator/avr8js_deploy.rs +++ b/crates/fbuild-daemon/src/handlers/emulator/avr8js_deploy.rs @@ -128,7 +128,7 @@ pub async fn deploy_avr8js( .join("avr8js") .join(&env_name) .join(&session_id); - if let Err(e) = std::fs::create_dir_all(&session_dir) { + if let Err(e) = tokio::fs::create_dir_all(&session_dir).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(OperationResponse::fail( @@ -139,7 +139,7 @@ pub async fn deploy_avr8js( } let staged_hex = session_dir.join("firmware.hex"); - if let Err(e) = std::fs::copy(&firmware_path, &staged_hex) { + if let Err(e) = tokio::fs::copy(&firmware_path, &staged_hex).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(OperationResponse::fail( @@ -151,7 +151,7 @@ pub async fn deploy_avr8js( let staged_elf = if let Some(ref elf) = elf_path { let dest = session_dir.join("firmware.elf"); - match std::fs::copy(elf, &dest) { + match tokio::fs::copy(elf, &dest).await { Ok(_) => Some(dest), Err(_) => None, } @@ -176,10 +176,12 @@ pub async fn deploy_avr8js( created_at_unix: now_unix(), }; let manifest_path = session_dir.join("session.json"); - if let Err(e) = std::fs::write( + if let Err(e) = tokio::fs::write( &manifest_path, serde_json::to_vec_pretty(&manifest).unwrap_or_default(), - ) { + ) + .await + { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(OperationResponse::fail( @@ -221,7 +223,7 @@ pub async fn deploy_avr8js( // The per-session firmware.hex and session.json continue to live // under session_dir. See FastLED/fbuild#291. let script_path = avr8js_cache.join("headless.mjs"); - if let Err(e) = std::fs::write(&script_path, AVR8JS_HEADLESS_MJS) { + if let Err(e) = tokio::fs::write(&script_path, AVR8JS_HEADLESS_MJS).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(OperationResponse::fail( diff --git a/crates/fbuild-daemon/src/handlers/emulator/avr8js_npm.rs b/crates/fbuild-daemon/src/handlers/emulator/avr8js_npm.rs index bc8ae7f9..e5999a98 100644 --- a/crates/fbuild-daemon/src/handlers/emulator/avr8js_npm.rs +++ b/crates/fbuild-daemon/src/handlers/emulator/avr8js_npm.rs @@ -122,12 +122,26 @@ pub(crate) async fn ensure_avr8js_npm_in( cache_dir: &Path, force_refresh: bool, ) -> fbuild_core::Result<()> { - if prepare_avr8js_cache_for_install(cache_dir, force_refresh) == Avr8jsCachePrep::AlreadyIntact - { + // `prepare_avr8js_cache_for_install` is sync (tested from sync contexts) + // and may perform `remove_dir_all` on a large `node_modules/` tree. + // Push the blocking I/O off the async runtime via `spawn_blocking` so + // we don't stall other handlers while a corrupt cache is wiped. + let cache_dir_owned = cache_dir.to_path_buf(); + let prep = tokio::task::spawn_blocking(move || { + prepare_avr8js_cache_for_install(&cache_dir_owned, force_refresh) + }) + .await + .map_err(|e| { + fbuild_core::FbuildError::DeployFailed(format!( + "avr8js cache prep task failed to join: {}", + e + )) + })?; + if prep == Avr8jsCachePrep::AlreadyIntact { return Ok(()); } - std::fs::create_dir_all(cache_dir).map_err(|e| { + tokio::fs::create_dir_all(cache_dir).await.map_err(|e| { fbuild_core::FbuildError::DeployFailed(format!( "failed to create avr8js cache dir at {}: {}", cache_dir.display(), diff --git a/crates/fbuild-daemon/src/handlers/emulator/avr8js_web.rs b/crates/fbuild-daemon/src/handlers/emulator/avr8js_web.rs index c20d45c9..ae03dc3d 100644 --- a/crates/fbuild-daemon/src/handlers/emulator/avr8js_web.rs +++ b/crates/fbuild-daemon/src/handlers/emulator/avr8js_web.rs @@ -158,7 +158,7 @@ pub(crate) fn render_page(session_id: &str) -> String { ) } -pub(crate) fn load_session_manifest( +pub(crate) async fn load_session_manifest( ctx: &DaemonContext, session_id: &str, ) -> fbuild_core::Result { @@ -169,7 +169,7 @@ pub(crate) fn load_session_manifest( .ok_or_else(|| { fbuild_core::FbuildError::Other(format!("unknown AVR8js session '{}'", session_id)) })?; - let raw = std::fs::read_to_string(&manifest_path)?; + let raw = tokio::fs::read_to_string(&manifest_path).await?; serde_json::from_str(&raw).map_err(|e| { fbuild_core::FbuildError::Other(format!("failed to parse AVR8js session manifest: {}", e)) }) @@ -179,7 +179,7 @@ pub async fn avr8js_page( AxumPath(session_id): AxumPath, State(ctx): State>, ) -> impl IntoResponse { - match load_session_manifest(&ctx, &session_id) { + match load_session_manifest(&ctx, &session_id).await { Ok(_) => Html(render_page(&session_id)).into_response(), Err(e) => (StatusCode::NOT_FOUND, e.to_string()).into_response(), } @@ -199,7 +199,7 @@ pub async fn avr8js_session_json( AxumPath(session_id): AxumPath, State(ctx): State>, ) -> impl IntoResponse { - match load_session_manifest(&ctx, &session_id) { + match load_session_manifest(&ctx, &session_id).await { Ok(manifest) => ( StatusCode::OK, Json(Avr8jsSessionResponse { @@ -221,8 +221,8 @@ pub async fn avr8js_firmware_hex( AxumPath(session_id): AxumPath, State(ctx): State>, ) -> impl IntoResponse { - match load_session_manifest(&ctx, &session_id) { - Ok(manifest) => match std::fs::read_to_string(&manifest.firmware_hex) { + match load_session_manifest(&ctx, &session_id).await { + Ok(manifest) => match tokio::fs::read_to_string(&manifest.firmware_hex).await { Ok(hex) => ( [( header::CONTENT_TYPE, diff --git a/crates/fbuild-daemon/src/handlers/emulator/qemu_deploy.rs b/crates/fbuild-daemon/src/handlers/emulator/qemu_deploy.rs index 335407b6..c786d44e 100644 --- a/crates/fbuild-daemon/src/handlers/emulator/qemu_deploy.rs +++ b/crates/fbuild-daemon/src/handlers/emulator/qemu_deploy.rs @@ -215,7 +215,7 @@ pub async fn deploy_qemu( }; let session_dir = qemu_session_dir(&project_dir, &env_name); - if let Err(e) = std::fs::create_dir_all(&session_dir) { + if let Err(e) = tokio::fs::create_dir_all(&session_dir).await { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(OperationResponse::fail( diff --git a/crates/fbuild-daemon/src/handlers/operations/build.rs b/crates/fbuild-daemon/src/handlers/operations/build.rs index b5a72693..8cec678e 100644 --- a/crates/fbuild-daemon/src/handlers/operations/build.rs +++ b/crates/fbuild-daemon/src/handlers/operations/build.rs @@ -198,7 +198,16 @@ pub async fn build( if stream { // --- STREAMING PATH --- // Build runs in a background task; log lines stream to client as NDJSON. - let (sync_tx, sync_rx) = std::sync::mpsc::channel::(); + // + // fbuild#818 async-audit follow-up: `BuildLog`'s sender is now a + // `tokio::sync::mpsc::UnboundedSender`, so the orchestrator (which + // may push from blocking compile workers via `spawn_blocking`) and + // this async forwarder share a single tokio channel. The previous + // `std::sync::mpsc::channel` + `spawn_blocking` recv-bridge has + // been removed — `UnboundedSender::send` is sync and callable from + // any context, and `UnboundedReceiver::recv` is awaited directly + // from the async forwarder task. + let (log_tx, mut log_rx) = tokio::sync::mpsc::unbounded_channel::(); let (async_tx, async_rx) = tokio::sync::mpsc::unbounded_channel::(); let params = fbuild_build::BuildParams { @@ -211,7 +220,7 @@ pub async fn build( jobs: req.jobs, generate_compiledb, compiledb_only: req.compiledb_only, - log_sender: Some(sync_tx), + log_sender: Some(log_tx), symbol_analysis: req.symbol_analysis, symbol_analysis_path: resolved_symbol_analysis_path.clone(), no_timestamp: req.no_timestamp, @@ -309,14 +318,17 @@ pub async fn build( lock_wait.as_millis() ); - // Bridge: sync log lines → async NDJSON chunks - let bridge_tx = async_tx.clone(); - let bridge = tokio::task::spawn_blocking(move || { - for line in sync_rx { + // Forwarder: log lines → async NDJSON chunks. + // fbuild#818: both endpoints are now tokio channels, so the + // earlier `spawn_blocking` sync→async bridge is gone — this + // task just awaits on the same runtime. + let forwarder_tx = async_tx.clone(); + let forwarder = tokio::spawn(async move { + while let Some(line) = log_rx.recv().await { let event = serde_json::json!({"type": "log", "message": line}); let mut chunk = event.to_string(); chunk.push('\n'); - if bridge_tx.send(bytes::Bytes::from(chunk)).is_err() { + if forwarder_tx.send(bytes::Bytes::from(chunk)).is_err() { break; } } @@ -366,20 +378,36 @@ pub async fn build( Ok(Ok(br)) => { let exported = if br.success { if let Some(ref out_dir) = resolved_output_dir { - Some(export_artifacts_bundle( - out_dir, - platform, - &env_name, - br.firmware_path.as_deref(), - br.elf_path.as_deref(), - )) + // fbuild#815: export_artifacts_bundle does sync + // std::fs I/O — move it off the axum worker. + let out_dir_owned = out_dir.clone(); + let env_name_owned = env_name.clone(); + let firmware_path_owned = br.firmware_path.clone(); + let elf_path_owned = br.elf_path.clone(); + let join_result = tokio::task::spawn_blocking(move || { + export_artifacts_bundle( + &out_dir_owned, + platform, + &env_name_owned, + firmware_path_owned.as_deref(), + elf_path_owned.as_deref(), + ) + }) + .await; + Some(match join_result { + Ok(inner) => inner, + Err(join_err) => Err(fbuild_core::FbuildError::Other(format!( + "artifact export task panicked: {}", + join_err + ))), + }) } else { None } } else { None }; - let _lines = br.build_log.into_lines(); // drop sender + let _lines = br.build_log.into_lines(); // drop sender so forwarder exits let summary = if br.success { let size_str = br .size_info @@ -448,7 +476,7 @@ pub async fn build( ), }; - let _ = bridge.await; + let _ = forwarder.await; if !success && !msg.is_empty() { let log_event = serde_json::json!({ @@ -531,13 +559,29 @@ pub async fn build( Ok(build_result) => { let exported = if build_result.success { if let Some(ref out_dir) = resolved_output_dir { - Some(export_artifacts_bundle( - out_dir, - platform, - &env_name, - build_result.firmware_path.as_deref(), - build_result.elf_path.as_deref(), - )) + // fbuild#815: export_artifacts_bundle does sync + // std::fs I/O — move it off the axum worker. + let out_dir_owned = out_dir.clone(); + let env_name_owned = env_name.clone(); + let firmware_path_owned = build_result.firmware_path.clone(); + let elf_path_owned = build_result.elf_path.clone(); + let join_result = tokio::task::spawn_blocking(move || { + export_artifacts_bundle( + &out_dir_owned, + platform, + &env_name_owned, + firmware_path_owned.as_deref(), + elf_path_owned.as_deref(), + ) + }) + .await; + Some(match join_result { + Ok(inner) => inner, + Err(join_err) => Err(fbuild_core::FbuildError::Other(format!( + "artifact export task panicked: {}", + join_err + ))), + }) } else { None } diff --git a/crates/fbuild-daemon/src/handlers/operations/deploy.rs b/crates/fbuild-daemon/src/handlers/operations/deploy.rs index 602e5de0..5b719b71 100644 --- a/crates/fbuild-daemon/src/handlers/operations/deploy.rs +++ b/crates/fbuild-daemon/src/handlers/operations/deploy.rs @@ -272,24 +272,43 @@ pub async fn deploy( }; let artifact_export = match resolved_output_dir.as_ref() { - Some(out_dir) => match export_artifacts_bundle( - out_dir, - platform, - &env_name, - Some(&firmware_path), - elf_path.as_deref(), - ) { - Ok(result) => Some(result), - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(OperationResponse::fail( - request_id, - format!("failed to export artifacts: {}", e), - )), - ); + Some(out_dir) => { + // fbuild#815: export_artifacts_bundle does sync std::fs I/O — + // move it off the axum worker thread. + let out_dir_owned = out_dir.clone(); + let env_name_owned = env_name.clone(); + let firmware_path_owned = firmware_path.clone(); + let elf_path_owned = elf_path.clone(); + let join_result = tokio::task::spawn_blocking(move || { + export_artifacts_bundle( + &out_dir_owned, + platform, + &env_name_owned, + Some(&firmware_path_owned), + elf_path_owned.as_deref(), + ) + }) + .await; + let export_result = match join_result { + Ok(inner) => inner, + Err(join_err) => Err(fbuild_core::FbuildError::Other(format!( + "artifact export task panicked: {}", + join_err + ))), + }; + match export_result { + Ok(result) => Some(result), + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(OperationResponse::fail( + request_id, + format!("failed to export artifacts: {}", e), + )), + ); + } } - }, + } None => None, }; diff --git a/crates/fbuild-python/src/async_serial_monitor.rs b/crates/fbuild-python/src/async_serial_monitor.rs index f461f152..cce939bc 100644 --- a/crates/fbuild-python/src/async_serial_monitor.rs +++ b/crates/fbuild-python/src/async_serial_monitor.rs @@ -281,43 +281,53 @@ impl AsyncSerialMonitor { py: Python<'py>, board: Option, ) -> PyResult> { - let url = format!("{}/api/reset", fbuild_paths::get_daemon_url()); let port = self.port.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - #[derive(Serialize)] - struct ResetPayload { - port: String, - #[serde(skip_serializing_if = "Option::is_none")] - board: Option, - } + post_reset_request_async(port, board).await + }) + } +} - let payload = ResetPayload { port, board }; +/// Issue the daemon's `POST /api/reset` and return whether the daemon +/// reported success. Shared between `AsyncSerialMonitor::reset_device` and +/// the sync `SerialMonitor::reset_device` (FastLED/fbuild#817) so the HTTP +/// transport is implemented exactly once. +pub(crate) async fn post_reset_request_async( + port: String, + board: Option, +) -> PyResult { + #[derive(Serialize)] + struct ResetPayload { + port: String, + #[serde(skip_serializing_if = "Option::is_none")] + board: Option, + } - let resp = reqwest::Client::new() - .post(&url) - .json(&payload) - .timeout(std::time::Duration::from_secs(10)) - .send() - .await - .map_err(|e| { - pyo3::exceptions::PyConnectionError::new_err(format!( - "failed to send reset request to daemon: {}", - e - )) - })?; + let url = format!("{}/api/reset", fbuild_paths::get_daemon_url()); + let payload = ResetPayload { port, board }; - let body: serde_json::Value = resp.json().await.map_err(|e| { - pyo3::exceptions::PyRuntimeError::new_err(format!( - "failed to parse reset response: {}", - e - )) - })?; + let resp = reqwest::Client::new() + .post(&url) + .json(&payload) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + .map_err(|e| { + pyo3::exceptions::PyConnectionError::new_err(format!( + "failed to send reset request to daemon: {}", + e + )) + })?; - Ok(body - .get("success") - .and_then(|v| v.as_bool()) - .unwrap_or(false)) - }) - } + let body: serde_json::Value = resp.json().await.map_err(|e| { + pyo3::exceptions::PyRuntimeError::new_err(format!( + "failed to parse reset response: {}", + e + )) + })?; + + Ok(body + .get("success") + .and_then(|v| v.as_bool()) + .unwrap_or(false)) } diff --git a/crates/fbuild-python/src/daemon.rs b/crates/fbuild-python/src/daemon.rs index 9586ab36..c06dd1a4 100644 --- a/crates/fbuild-python/src/daemon.rs +++ b/crates/fbuild-python/src/daemon.rs @@ -3,10 +3,8 @@ use std::path::{Path, PathBuf}; use pyo3::prelude::*; -use running_process::broker::adopt::{ - AdoptError, AsyncBrokerSession, BrokerSession, OwnedConnectRequest, -}; -use running_process::broker::client::{ConnectBackendRequest, RefusalKind}; +use running_process::broker::adopt::{AdoptError, AsyncBrokerSession, OwnedConnectRequest}; +use running_process::broker::client::RefusalKind; /// Filename of the daemon binary on this platform. /// @@ -138,21 +136,14 @@ fn daemon_cache_identity_error(info: &serde_json::Value) -> Option { None } -fn verify_broker_daemon_cache_identity_blocking() -> Result<(), String> { - let client = reqwest::blocking::Client::builder() - .timeout(std::time::Duration::from_secs(5)) +/// Build a single-thread tokio runtime for one-shot sync helpers. Returns a +/// stringly-typed error so callers can plumb it through `Result<_, String>` +/// without taking on a tokio import. FastLED/fbuild#817. +fn one_shot_runtime() -> Result { + tokio::runtime::Builder::new_current_thread() + .enable_all() .build() - .map_err(|e| format!("failed to build HTTP client: {e}"))?; - let info: serde_json::Value = client - .get(direct_info_url()) - .send() - .map_err(|e| format!("daemon info request failed: {e}"))? - .json() - .map_err(|e| format!("daemon info response was invalid JSON: {e}"))?; - if let Some(err) = daemon_cache_identity_error(&info) { - return Err(err); - } - Ok(()) + .map_err(|e| format!("failed to build tokio runtime: {e}")) } async fn verify_broker_daemon_cache_identity_async() -> Result<(), String> { @@ -178,47 +169,6 @@ fn broker_endpoint() -> Option { running_process::broker::doctor::default_broker_endpoint().ok() } -fn ensure_running_via_broker_blocking(url: &str) -> Result { - let Some(endpoint) = broker_endpoint() else { - return Ok(false); - }; - let request = ConnectBackendRequest::new( - &endpoint, - fbuild_paths::running_process::SERVICE_NAME, - env!("CARGO_PKG_VERSION"), - env!("CARGO_PKG_VERSION"), - ); - match BrokerSession::adopt(request) { - Ok(_session) => { - for _ in 0..100 { - std::thread::sleep(std::time::Duration::from_millis(100)); - if let Ok(resp) = reqwest::blocking::get(url) { - if resp.status().is_success() { - verify_broker_daemon_cache_identity_blocking()?; - return Ok(true); - } - } - } - Err( - "broker negotiated fbuild-daemon, but its HTTP endpoint did not become healthy" - .to_string(), - ) - } - Err(AdoptError::BrokerDisabled) => Ok(false), - Err(AdoptError::DisableEnv(err)) => Err(err.to_string()), - Err(AdoptError::Connect(err)) => { - if broker_refusal_is_fatal(err.refusal_kind()) { - Err(format!( - "running-process broker refused fbuild daemon version: {err}" - )) - } else { - Ok(false) - } - } - Err(AdoptError::AsyncJoin(_)) => Ok(false), - } -} - async fn ensure_running_via_broker_async(url: &str) -> Result { let Some(endpoint) = broker_endpoint() else { return Ok(false); @@ -273,89 +223,147 @@ fn broker_refusal_is_fatal(kind: Option) -> bool { ) } -/// Python-visible Daemon class (high-level API). -#[pyclass] -pub(crate) struct Daemon; +/// Shared async implementation of `ensure_running`. Used by both the sync +/// `Daemon::ensure_running` (via a one-shot tokio runtime) and the async +/// `AsyncDaemon::ensure_running` (via `pyo3_async_runtimes::tokio`). The +/// process spawn must be resolved against the venv before this is called. +/// FastLED/fbuild#817. +async fn ensure_running_async_impl(url: &str, spawn_target: Option, dev_mode: bool) -> bool { + match ensure_running_via_broker_async(url).await { + Ok(true) => return true, + Ok(false) => {} + Err(_) => return false, + } -#[pymethods] -impl Daemon { - #[staticmethod] - fn ensure_running() -> bool { - let url = direct_health_url(); - match ensure_running_via_broker_blocking(&url) { - Ok(true) => return true, - Ok(false) => {} - Err(_) => return false, + let client = reqwest::Client::new(); + + // Fast path: daemon is already up. + if let Ok(resp) = client + .get(url) + .timeout(std::time::Duration::from_secs(5)) + .send() + .await + { + if resp.status().is_success() { + return true; } + } - if let Ok(resp) = reqwest::blocking::get(&url) { + // INTENTIONALLY DETACHED (FastLED/fbuild#32): the Python host spawns + // the daemon and the Python interpreter may exit — the daemon must + // survive. This PyO3 binding runs inside the Python interpreter + // process, which has no global containment group, so `spawn()` is + // already uncontained; see the matching comment in + // `fbuild-cli/src/daemon_client.rs`. + // + // Prefer the daemon binary sitting next to `sys.executable` + // (FastLED/fbuild#275) so a venv install never gets shadowed by a + // stale user-level daemon on PATH. + // allow-direct-spawn: daemon must outlive the Python interpreter. + let mut cmd = match spawn_target { + // allow-direct-spawn: daemon must outlive the Python interpreter. + Some(path) => std::process::Command::new(path), + // allow-direct-spawn: daemon must outlive the Python interpreter. + None => std::process::Command::new(DAEMON_BIN_NAME), + }; + if dev_mode { + cmd.arg("--dev"); + } + cmd.stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()); + + if cmd.spawn().is_err() { + return false; + } + + for _ in 0..100 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + if let Ok(resp) = client + .get(url) + .timeout(std::time::Duration::from_secs(5)) + .send() + .await + { if resp.status().is_success() { return true; } } + } + false +} - // INTENTIONALLY DETACHED (FastLED/fbuild#32): the Python host - // spawns the daemon and then the Python interpreter may exit — - // the daemon must survive. This PyO3 binding runs inside the - // Python interpreter process, which has no global containment - // group, so `spawn()` is already uncontained; see the matching - // comment in fbuild-cli/src/daemon_client.rs. - // allow-direct-spawn: daemon must outlive the Python interpreter. - // - // Prefer the daemon binary sitting next to `sys.executable` - // (FastLED/fbuild#275) so a venv install never gets shadowed by a - // stale user-level daemon on PATH. - let mut cmd = match daemon_spawn_target() { - // allow-direct-spawn: daemon must outlive the Python interpreter. - Some(path) => std::process::Command::new(path), - // allow-direct-spawn: daemon must outlive the Python interpreter. - None => std::process::Command::new(DAEMON_BIN_NAME), - }; - if fbuild_paths::is_dev_mode() { - cmd.arg("--dev"); - } - cmd.stdin(std::process::Stdio::null()) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()); +/// Shared async implementation of `stop`. Returns `true` iff the daemon +/// responded 2xx to the shutdown POST. FastLED/fbuild#817. +async fn stop_async_impl() -> bool { + let url = format!("{}/api/daemon/shutdown", fbuild_paths::get_daemon_url()); + reqwest::Client::new() + .post(&url) + .headers(shutdown_caller_headers()) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + .map(|r| r.status().is_success()) + .unwrap_or(false) +} - if cmd.spawn().is_err() { - return false; - } +/// Shared async implementation of `status`. Returns the response body text +/// (the caller decodes JSON with the GIL held). FastLED/fbuild#817. +async fn status_async_impl() -> PyResult { + let url = format!("{}/api/daemon/info", fbuild_paths::get_daemon_url()); + let resp = reqwest::Client::new() + .get(&url) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + .map_err(|e| { + pyo3::exceptions::PyConnectionError::new_err(format!( + "failed to connect to daemon: {}", + e + )) + })?; - for _ in 0..100 { - std::thread::sleep(std::time::Duration::from_millis(100)); - if let Ok(resp) = reqwest::blocking::get(&url) { - if resp.status().is_success() { - return true; - } - } - } - false + resp.text().await.map_err(|e| { + pyo3::exceptions::PyRuntimeError::new_err(format!("failed to read response: {}", e)) + }) +} + +/// Python-visible Daemon class (high-level API). +#[pyclass] +pub(crate) struct Daemon; + +#[pymethods] +impl Daemon { + #[staticmethod] + fn ensure_running() -> bool { + // FastLED/fbuild#817: sync wrapper around the shared async impl. + // Resolve `sys.executable` BEFORE entering the runtime — the + // venv-adjacent lookup needs the GIL and must not be held across + // `.await`. + let url = direct_health_url(); + let dev_mode = fbuild_paths::is_dev_mode(); + let spawn_target = daemon_spawn_target(); + let Ok(rt) = one_shot_runtime() else { + return false; + }; + rt.block_on(ensure_running_async_impl(&url, spawn_target, dev_mode)) } #[staticmethod] fn stop() -> bool { - let url = format!("{}/api/daemon/shutdown", fbuild_paths::get_daemon_url()); - reqwest::blocking::Client::new() - .post(&url) - .headers(shutdown_caller_headers()) - .send() - .map(|r| r.status().is_success()) - .unwrap_or(false) + // FastLED/fbuild#817: sync wrapper around `stop_async_impl`. + let Ok(rt) = one_shot_runtime() else { + return false; + }; + rt.block_on(stop_async_impl()) } #[staticmethod] fn status(py: Python<'_>) -> PyResult { - let url = format!("{}/api/daemon/info", fbuild_paths::get_daemon_url()); - let resp = reqwest::blocking::get(&url).map_err(|e| { - pyo3::exceptions::PyConnectionError::new_err(format!( - "failed to connect to daemon: {}", - e - )) - })?; - let text = resp.text().map_err(|e| { - pyo3::exceptions::PyRuntimeError::new_err(format!("failed to read response: {}", e)) - })?; + // FastLED/fbuild#817: sync wrapper around `status_async_impl`. + let rt = one_shot_runtime() + .map_err(pyo3::exceptions::PyRuntimeError::new_err)?; + let text = rt.block_on(status_async_impl())?; let json_module = py.import_bound("json")?; let result = json_module.call_method1("loads", (text,))?; Ok(result.to_object(py)) @@ -388,28 +396,8 @@ impl AsyncDaemon { /// ConnectionError/RuntimeError. #[staticmethod] fn status(py: Python<'_>) -> PyResult> { - let url = format!("{}/api/daemon/info", fbuild_paths::get_daemon_url()); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let resp = reqwest::Client::new() - .get(&url) - .timeout(std::time::Duration::from_secs(10)) - .send() - .await - .map_err(|e| { - pyo3::exceptions::PyConnectionError::new_err(format!( - "failed to connect to daemon: {}", - e - )) - })?; - - let text = resp.text().await.map_err(|e| { - pyo3::exceptions::PyRuntimeError::new_err(format!( - "failed to read daemon response: {}", - e - )) - })?; - + let text = status_async_impl().await?; Python::with_gil(|py| { let json_module = py.import_bound("json")?; let parsed = json_module.call_method1("loads", (text,))?; @@ -439,63 +427,7 @@ impl AsyncDaemon { let spawn_target = daemon_spawn_target(); pyo3_async_runtimes::tokio::future_into_py(py, async move { - match ensure_running_via_broker_async(&url).await { - Ok(true) => return Ok(true), - Ok(false) => {} - Err(_) => return Ok(false), - } - - let client = reqwest::Client::new(); - - // Fast path: daemon is already up. - if let Ok(resp) = client - .get(&url) - .timeout(std::time::Duration::from_secs(5)) - .send() - .await - { - if resp.status().is_success() { - return Ok(true); - } - } - - // INTENTIONALLY DETACHED (FastLED/fbuild#32): see the - // matching comment in `Daemon::ensure_running` above. - // Venv-adjacent preference (FastLED/fbuild#275): resolved - // synchronously above the future so we never touch the GIL - // from inside this async block. - // allow-direct-spawn: daemon must outlive the Python interpreter. - let mut cmd = match spawn_target { - // allow-direct-spawn: daemon must outlive the Python interpreter. - Some(path) => std::process::Command::new(path), - // allow-direct-spawn: daemon must outlive the Python interpreter. - None => std::process::Command::new(DAEMON_BIN_NAME), - }; - if dev_mode { - cmd.arg("--dev"); - } - cmd.stdin(std::process::Stdio::null()) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()); - - if cmd.spawn().is_err() { - return Ok(false); - } - - for _ in 0..100 { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - if let Ok(resp) = client - .get(&url) - .timeout(std::time::Duration::from_secs(5)) - .send() - .await - { - if resp.status().is_success() { - return Ok(true); - } - } - } - Ok(false) + Ok(ensure_running_async_impl(&url, spawn_target, dev_mode).await) }) } @@ -503,18 +435,8 @@ impl AsyncDaemon { /// Returns `True` if the daemon acknowledged with a 2xx response. #[staticmethod] fn stop(py: Python<'_>) -> PyResult> { - let url = format!("{}/api/daemon/shutdown", fbuild_paths::get_daemon_url()); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let ok = reqwest::Client::new() - .post(&url) - .headers(shutdown_caller_headers()) - .timeout(std::time::Duration::from_secs(10)) - .send() - .await - .map(|r| r.status().is_success()) - .unwrap_or(false); - Ok(ok) + Ok(stop_async_impl().await) }) } } diff --git a/crates/fbuild-python/src/outcome.rs b/crates/fbuild-python/src/outcome.rs index 58fc2258..25788e54 100644 --- a/crates/fbuild-python/src/outcome.rs +++ b/crates/fbuild-python/src/outcome.rs @@ -119,48 +119,26 @@ pub(crate) fn parse_outcome(body: &serde_json::Value) -> OperationOutcome { } pub(crate) fn send_op(url: &str, req: &OpRequest, timeout: f64) -> OperationOutcome { - let client = reqwest::blocking::Client::new(); - match client - .post(url) - .json(req) - .timeout(std::time::Duration::from_secs_f64(timeout)) - .send() + // Sync wrapper around `send_op_async` (FastLED/fbuild#817): builds a + // current-thread tokio runtime per call so we don't duplicate the HTTP + // transport code. If the runtime fails to build we surface a structured + // failure outcome identical to a network error. + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() { - Ok(resp) => match resp.json::() { - Ok(body) => { - let outcome = parse_outcome(&body); - if !outcome.success { - if let Some(ref msg) = outcome.message { - eprintln!("[fbuild] operation failed: {}", msg); - } - if let Some(ref stderr) = outcome.stderr { - if !stderr.is_empty() { - eprintln!("[fbuild] stderr:\n{}", stderr); - } - } - } - outcome - } - Err(e) => { - let msg = format!("failed to parse daemon response: {}", e); - eprintln!("[fbuild] {}", msg); - OperationOutcome { - success: false, - message: Some(msg), - ..Default::default() - } - } - }, + Ok(rt) => rt, Err(e) => { - let msg = format!("request failed: {}", e); + let msg = format!("failed to build tokio runtime: {}", e); eprintln!("[fbuild] {}", msg); - OperationOutcome { + return OperationOutcome { success: false, message: Some(msg), ..Default::default() - } + }; } - } + }; + rt.block_on(send_op_async(url.to_string(), req.clone(), timeout)) } /// Native-async counterpart to `send_op`. Issues the same HTTP POST against diff --git a/crates/fbuild-python/src/serial_monitor.rs b/crates/fbuild-python/src/serial_monitor.rs index 42c37dd2..4c77fe18 100644 --- a/crates/fbuild-python/src/serial_monitor.rs +++ b/crates/fbuild-python/src/serial_monitor.rs @@ -3,7 +3,6 @@ use base64::Engine; use futures::{SinkExt, StreamExt}; use pyo3::prelude::*; -use serde::Serialize; use std::sync::Mutex; use tokio::runtime::Runtime; use tokio_tungstenite::tungstenite; @@ -462,44 +461,31 @@ impl SerialMonitor { wait_for_output: bool, timeout: f64, ) -> PyResult { - let url = format!("{}/api/reset", fbuild_paths::get_daemon_url()); - - #[derive(Serialize)] - struct ResetPayload { - port: String, - #[serde(skip_serializing_if = "Option::is_none")] - board: Option, - } - - let payload = ResetPayload { - port: self.port.clone(), - board, + // FastLED/fbuild#817: delegate the HTTP transport to the shared + // async helper. `reset_device` may be called WITHOUT `__enter__` + // (no WebSocket session), so `self.runtime` may be `None` — fall + // back to a one-shot current-thread runtime in that case. + let port = self.port.clone(); + let success = match self.runtime.as_ref() { + Some(rt) => rt.block_on(crate::async_serial_monitor::post_reset_request_async( + port, board, + ))?, + None => { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| { + pyo3::exceptions::PyRuntimeError::new_err(format!( + "failed to build tokio runtime: {}", + e + )) + })?; + rt.block_on(crate::async_serial_monitor::post_reset_request_async( + port, board, + ))? + } }; - let resp = reqwest::blocking::Client::new() - .post(&url) - .json(&payload) - .timeout(std::time::Duration::from_secs(10)) - .send() - .map_err(|e| { - pyo3::exceptions::PyConnectionError::new_err(format!( - "failed to send reset request to daemon: {}", - e - )) - })?; - - let body: serde_json::Value = resp.json().map_err(|e| { - pyo3::exceptions::PyRuntimeError::new_err(format!( - "failed to parse reset response: {}", - e - )) - })?; - - let success = body - .get("success") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - let was_connected = self.runtime.is_some() && self.ws_write.is_some() && self.ws_read.is_some(); if was_connected {