Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions crates/fbuild-build/src/build_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use std::time::Instant;
use fbuild_core::{BuildLog, MemoryRegion, SizeInfo, SymbolMap};

/// Create a [`BuildLog`], optionally wired to a real-time streaming sender.
pub fn create_build_log(sender: Option<std::sync::mpsc::Sender<String>>) -> BuildLog {
pub fn create_build_log(
sender: Option<tokio::sync::mpsc::UnboundedSender<String>>,
) -> BuildLog {
match sender {
Some(s) => BuildLog::with_sender(s),
None => BuildLog::new(),
Expand All @@ -19,7 +21,7 @@ pub fn create_build_log(sender: Option<std::sync::mpsc::Sender<String>>) -> Buil

/// Create a [`BuildLog`] with elapsed-time prefixes from the given epoch.
pub fn create_build_log_with_epoch(
sender: Option<std::sync::mpsc::Sender<String>>,
sender: Option<tokio::sync::mpsc::UnboundedSender<String>>,
epoch: Instant,
) -> BuildLog {
match sender {
Expand Down Expand Up @@ -327,7 +329,7 @@ mod tests {

#[test]
fn test_create_build_log_with_sender() {
let (tx, rx) = std::sync::mpsc::channel();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut log = create_build_log(Some(tx));
log.push("test");
assert_eq!(rx.try_recv().unwrap(), "test");
Expand Down
8 changes: 7 additions & 1 deletion crates/fbuild-build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ pub struct BuildParams {
/// Used by IWYU and clang-tidy to avoid building framework core files.
pub compiledb_only: bool,
/// Optional sender for streaming build log lines in real-time.
pub log_sender: Option<std::sync::mpsc::Sender<String>>,
///
/// Uses `tokio::sync::mpsc::UnboundedSender` so the orchestrator (running
/// on a tokio runtime) and the WebSocket forwarder can share one channel
/// without a sync→async bridge — `UnboundedSender::send` is sync and safe
/// to call from blocking code, while the receive side is awaited from the
/// async daemon handler (fbuild#818).
pub log_sender: Option<tokio::sync::mpsc::UnboundedSender<String>>,
/// When true, run symbol-level memory analysis after linking.
pub symbol_analysis: bool,
/// Optional path to write the symbol analysis report to.
Expand Down
19 changes: 12 additions & 7 deletions crates/fbuild-core/src/build_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
//! the elapsed time since that epoch (e.g. `" 0.46 compiling foo.cpp"`).

use std::collections::VecDeque;
use std::sync::mpsc::Sender;
use std::time::Instant;
use tokio::sync::mpsc::UnboundedSender;

/// Centralized build output log.
///
/// Accumulates build output lines (compilation steps, warnings, size info, etc.)
/// and optionally streams each line through a channel for real-time delivery
/// from the daemon to the CLI.
///
/// The streaming sender is a `tokio::sync::mpsc::UnboundedSender` so that
/// push-from-sync-code and recv-from-async-code share one channel without an
/// intermediate `spawn_blocking` bridge — `UnboundedSender::send` is sync and
/// callable from any context (see fbuild#818 async-audit follow-up).
pub struct BuildLog {
lines: VecDeque<String>,
sender: Option<Sender<String>>,
sender: Option<UnboundedSender<String>>,
epoch: Option<Instant>,
}

Expand All @@ -33,7 +38,7 @@ impl BuildLog {
}

/// Create a log that streams each line through the given sender (no timestamps).
pub fn with_sender(sender: Sender<String>) -> Self {
pub fn with_sender(sender: UnboundedSender<String>) -> Self {
Self {
lines: VecDeque::new(),
sender: Some(sender),
Expand All @@ -51,7 +56,7 @@ impl BuildLog {
}

/// Create a log that streams each line and prefixes with elapsed time.
pub fn with_sender_and_epoch(sender: Sender<String>, epoch: Instant) -> Self {
pub fn with_sender_and_epoch(sender: UnboundedSender<String>, epoch: Instant) -> Self {
Self {
lines: VecDeque::new(),
sender: Some(sender),
Expand Down Expand Up @@ -116,7 +121,7 @@ mod tests {

#[test]
fn streams_through_sender() {
let (tx, rx) = std::sync::mpsc::channel();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut log = BuildLog::with_sender(tx);
log.push("hello");
log.push("world");
Expand All @@ -129,7 +134,7 @@ mod tests {

#[test]
fn sender_dropped_does_not_panic() {
let (tx, rx) = std::sync::mpsc::channel();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<String>();
drop(rx);
let mut log = BuildLog::with_sender(tx);
// Should not panic even though receiver is gone
Expand Down Expand Up @@ -160,7 +165,7 @@ mod tests {

#[test]
fn with_sender_and_epoch_streams_prefixed() {
let (tx, rx) = std::sync::mpsc::channel();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let epoch = Instant::now();
let mut log = BuildLog::with_sender_and_epoch(tx, epoch);
log.push("test");
Expand Down
14 changes: 8 additions & 6 deletions crates/fbuild-daemon/src/handlers/emulator/avr8js_deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn deploy_avr8js(
.join("avr8js")
.join(&env_name)
.join(&session_id);
if let Err(e) = std::fs::create_dir_all(&session_dir) {
if let Err(e) = tokio::fs::create_dir_all(&session_dir).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationResponse::fail(
Expand All @@ -139,7 +139,7 @@ pub async fn deploy_avr8js(
}

let staged_hex = session_dir.join("firmware.hex");
if let Err(e) = std::fs::copy(&firmware_path, &staged_hex) {
if let Err(e) = tokio::fs::copy(&firmware_path, &staged_hex).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationResponse::fail(
Expand All @@ -151,7 +151,7 @@ pub async fn deploy_avr8js(

let staged_elf = if let Some(ref elf) = elf_path {
let dest = session_dir.join("firmware.elf");
match std::fs::copy(elf, &dest) {
match tokio::fs::copy(elf, &dest).await {
Ok(_) => Some(dest),
Err(_) => None,
}
Expand All @@ -176,10 +176,12 @@ pub async fn deploy_avr8js(
created_at_unix: now_unix(),
};
let manifest_path = session_dir.join("session.json");
if let Err(e) = std::fs::write(
if let Err(e) = tokio::fs::write(
&manifest_path,
serde_json::to_vec_pretty(&manifest).unwrap_or_default(),
) {
)
.await
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationResponse::fail(
Expand Down Expand Up @@ -221,7 +223,7 @@ pub async fn deploy_avr8js(
// The per-session firmware.hex and session.json continue to live
// under session_dir. See FastLED/fbuild#291.
let script_path = avr8js_cache.join("headless.mjs");
if let Err(e) = std::fs::write(&script_path, AVR8JS_HEADLESS_MJS) {
if let Err(e) = tokio::fs::write(&script_path, AVR8JS_HEADLESS_MJS).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationResponse::fail(
Expand Down
20 changes: 17 additions & 3 deletions crates/fbuild-daemon/src/handlers/emulator/avr8js_npm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,26 @@ pub(crate) async fn ensure_avr8js_npm_in(
cache_dir: &Path,
force_refresh: bool,
) -> fbuild_core::Result<()> {
if prepare_avr8js_cache_for_install(cache_dir, force_refresh) == Avr8jsCachePrep::AlreadyIntact
{
// `prepare_avr8js_cache_for_install` is sync (tested from sync contexts)
// and may perform `remove_dir_all` on a large `node_modules/` tree.
// Push the blocking I/O off the async runtime via `spawn_blocking` so
// we don't stall other handlers while a corrupt cache is wiped.
let cache_dir_owned = cache_dir.to_path_buf();
let prep = tokio::task::spawn_blocking(move || {
prepare_avr8js_cache_for_install(&cache_dir_owned, force_refresh)
})
.await
.map_err(|e| {
fbuild_core::FbuildError::DeployFailed(format!(
"avr8js cache prep task failed to join: {}",
e
))
})?;
if prep == Avr8jsCachePrep::AlreadyIntact {
return Ok(());
}

std::fs::create_dir_all(cache_dir).map_err(|e| {
tokio::fs::create_dir_all(cache_dir).await.map_err(|e| {
fbuild_core::FbuildError::DeployFailed(format!(
"failed to create avr8js cache dir at {}: {}",
cache_dir.display(),
Expand Down
12 changes: 6 additions & 6 deletions crates/fbuild-daemon/src/handlers/emulator/avr8js_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub(crate) fn render_page(session_id: &str) -> String {
)
}

pub(crate) fn load_session_manifest(
pub(crate) async fn load_session_manifest(
ctx: &DaemonContext,
session_id: &str,
) -> fbuild_core::Result<Avr8jsSessionManifest> {
Expand All @@ -169,7 +169,7 @@ pub(crate) fn load_session_manifest(
.ok_or_else(|| {
fbuild_core::FbuildError::Other(format!("unknown AVR8js session '{}'", session_id))
})?;
let raw = std::fs::read_to_string(&manifest_path)?;
let raw = tokio::fs::read_to_string(&manifest_path).await?;
serde_json::from_str(&raw).map_err(|e| {
fbuild_core::FbuildError::Other(format!("failed to parse AVR8js session manifest: {}", e))
})
Expand All @@ -179,7 +179,7 @@ pub async fn avr8js_page(
AxumPath(session_id): AxumPath<String>,
State(ctx): State<Arc<DaemonContext>>,
) -> impl IntoResponse {
match load_session_manifest(&ctx, &session_id) {
match load_session_manifest(&ctx, &session_id).await {
Ok(_) => Html(render_page(&session_id)).into_response(),
Err(e) => (StatusCode::NOT_FOUND, e.to_string()).into_response(),
}
Expand All @@ -199,7 +199,7 @@ pub async fn avr8js_session_json(
AxumPath(session_id): AxumPath<String>,
State(ctx): State<Arc<DaemonContext>>,
) -> impl IntoResponse {
match load_session_manifest(&ctx, &session_id) {
match load_session_manifest(&ctx, &session_id).await {
Ok(manifest) => (
StatusCode::OK,
Json(Avr8jsSessionResponse {
Expand All @@ -221,8 +221,8 @@ pub async fn avr8js_firmware_hex(
AxumPath(session_id): AxumPath<String>,
State(ctx): State<Arc<DaemonContext>>,
) -> impl IntoResponse {
match load_session_manifest(&ctx, &session_id) {
Ok(manifest) => match std::fs::read_to_string(&manifest.firmware_hex) {
match load_session_manifest(&ctx, &session_id).await {
Ok(manifest) => match tokio::fs::read_to_string(&manifest.firmware_hex).await {
Ok(hex) => (
[(
header::CONTENT_TYPE,
Expand Down
2 changes: 1 addition & 1 deletion crates/fbuild-daemon/src/handlers/emulator/qemu_deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ pub async fn deploy_qemu(
};

let session_dir = qemu_session_dir(&project_dir, &env_name);
if let Err(e) = std::fs::create_dir_all(&session_dir) {
if let Err(e) = tokio::fs::create_dir_all(&session_dir).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationResponse::fail(
Expand Down
90 changes: 67 additions & 23 deletions crates/fbuild-daemon/src/handlers/operations/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,16 @@ pub async fn build(
if stream {
// --- STREAMING PATH ---
// Build runs in a background task; log lines stream to client as NDJSON.
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<String>();
//
// fbuild#818 async-audit follow-up: `BuildLog`'s sender is now a
// `tokio::sync::mpsc::UnboundedSender`, so the orchestrator (which
// may push from blocking compile workers via `spawn_blocking`) and
// this async forwarder share a single tokio channel. The previous
// `std::sync::mpsc::channel` + `spawn_blocking` recv-bridge has
// been removed — `UnboundedSender::send` is sync and callable from
// any context, and `UnboundedReceiver::recv` is awaited directly
// from the async forwarder task.
let (log_tx, mut log_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (async_tx, async_rx) = tokio::sync::mpsc::unbounded_channel::<bytes::Bytes>();

let params = fbuild_build::BuildParams {
Expand All @@ -211,7 +220,7 @@ pub async fn build(
jobs: req.jobs,
generate_compiledb,
compiledb_only: req.compiledb_only,
log_sender: Some(sync_tx),
log_sender: Some(log_tx),
symbol_analysis: req.symbol_analysis,
symbol_analysis_path: resolved_symbol_analysis_path.clone(),
no_timestamp: req.no_timestamp,
Expand Down Expand Up @@ -309,14 +318,17 @@ pub async fn build(
lock_wait.as_millis()
);

// Bridge: sync log lines → async NDJSON chunks
let bridge_tx = async_tx.clone();
let bridge = tokio::task::spawn_blocking(move || {
for line in sync_rx {
// Forwarder: log lines → async NDJSON chunks.
// fbuild#818: both endpoints are now tokio channels, so the
// earlier `spawn_blocking` sync→async bridge is gone — this
// task just awaits on the same runtime.
let forwarder_tx = async_tx.clone();
let forwarder = tokio::spawn(async move {
while let Some(line) = log_rx.recv().await {
let event = serde_json::json!({"type": "log", "message": line});
let mut chunk = event.to_string();
chunk.push('\n');
if bridge_tx.send(bytes::Bytes::from(chunk)).is_err() {
if forwarder_tx.send(bytes::Bytes::from(chunk)).is_err() {
break;
}
}
Expand Down Expand Up @@ -366,20 +378,36 @@ pub async fn build(
Ok(Ok(br)) => {
let exported = if br.success {
if let Some(ref out_dir) = resolved_output_dir {
Some(export_artifacts_bundle(
out_dir,
platform,
&env_name,
br.firmware_path.as_deref(),
br.elf_path.as_deref(),
))
// fbuild#815: export_artifacts_bundle does sync
// std::fs I/O — move it off the axum worker.
let out_dir_owned = out_dir.clone();
let env_name_owned = env_name.clone();
let firmware_path_owned = br.firmware_path.clone();
let elf_path_owned = br.elf_path.clone();
let join_result = tokio::task::spawn_blocking(move || {
export_artifacts_bundle(
&out_dir_owned,
platform,
&env_name_owned,
firmware_path_owned.as_deref(),
elf_path_owned.as_deref(),
)
})
.await;
Some(match join_result {
Ok(inner) => inner,
Err(join_err) => Err(fbuild_core::FbuildError::Other(format!(
"artifact export task panicked: {}",
join_err
))),
})
} else {
None
}
} else {
None
};
let _lines = br.build_log.into_lines(); // drop sender
let _lines = br.build_log.into_lines(); // drop sender so forwarder exits
let summary = if br.success {
let size_str = br
.size_info
Expand Down Expand Up @@ -448,7 +476,7 @@ pub async fn build(
),
};

let _ = bridge.await;
let _ = forwarder.await;

if !success && !msg.is_empty() {
let log_event = serde_json::json!({
Expand Down Expand Up @@ -531,13 +559,29 @@ pub async fn build(
Ok(build_result) => {
let exported = if build_result.success {
if let Some(ref out_dir) = resolved_output_dir {
Some(export_artifacts_bundle(
out_dir,
platform,
&env_name,
build_result.firmware_path.as_deref(),
build_result.elf_path.as_deref(),
))
// fbuild#815: export_artifacts_bundle does sync
// std::fs I/O — move it off the axum worker.
let out_dir_owned = out_dir.clone();
let env_name_owned = env_name.clone();
let firmware_path_owned = build_result.firmware_path.clone();
let elf_path_owned = build_result.elf_path.clone();
let join_result = tokio::task::spawn_blocking(move || {
export_artifacts_bundle(
&out_dir_owned,
platform,
&env_name_owned,
firmware_path_owned.as_deref(),
elf_path_owned.as_deref(),
)
})
.await;
Some(match join_result {
Ok(inner) => inner,
Err(join_err) => Err(fbuild_core::FbuildError::Other(format!(
"artifact export task panicked: {}",
join_err
))),
})
} else {
None
}
Expand Down
Loading
Loading