diff --git a/crates/fbuild-build/src/generic_arm/arm_linker.rs b/crates/fbuild-build/src/generic_arm/arm_linker.rs index bd2ff0a2..8234bc1e 100644 --- a/crates/fbuild-build/src/generic_arm/arm_linker.rs +++ b/crates/fbuild-build/src/generic_arm/arm_linker.rs @@ -1,4 +1,4 @@ -//! Generic ARM linker implementation. +//! Generic ARM linker implementation. //! //! Links ARM Cortex-M object files into firmware.elf, converts to firmware.hex, //! and reports size using arm-none-eabi-size. Used by STM32, RP2040, NRF52, etc. @@ -132,8 +132,7 @@ impl ArmLinker { #[async_trait::async_trait] impl Linker for ArmLinker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-build/src/lib.rs b/crates/fbuild-build/src/lib.rs index d232d09c..221fc1ab 100644 --- a/crates/fbuild-build/src/lib.rs +++ b/crates/fbuild-build/src/lib.rs @@ -220,7 +220,9 @@ pub fn get_orchestrator(platform: Platform) -> Result /// Install platform-specific dependencies (toolchain, framework). pub async fn install_platform_deps(platform: Platform, project_dir: &Path) -> Result<()> { - get_platform_support(platform)?.install_deps(project_dir).await + get_platform_support(platform)? + .install_deps(project_dir) + .await } #[cfg(test)] diff --git a/crates/fbuild-build/src/nrf52/nrf52_linker.rs b/crates/fbuild-build/src/nrf52/nrf52_linker.rs index 2043ef0a..368742e9 100644 --- a/crates/fbuild-build/src/nrf52/nrf52_linker.rs +++ b/crates/fbuild-build/src/nrf52/nrf52_linker.rs @@ -1,4 +1,4 @@ -//! NRF52 ARM linker implementation. +//! NRF52 ARM linker implementation. //! //! Links ARM Cortex-M4F object files into firmware.elf, converts to firmware.hex, //! and reports size using arm-none-eabi-size. @@ -60,8 +60,7 @@ impl Nrf52Linker { #[async_trait::async_trait] impl Linker for Nrf52Linker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-build/src/nrf52/orchestrator.rs b/crates/fbuild-build/src/nrf52/orchestrator.rs index f684396b..de100ca8 100644 --- a/crates/fbuild-build/src/nrf52/orchestrator.rs +++ b/crates/fbuild-build/src/nrf52/orchestrator.rs @@ -1,4 +1,4 @@ -//! NRF52 build orchestrator — wires together config, packages, compiler, linker. +//! NRF52 build orchestrator — wires together config, packages, compiler, linker. //! //! Build phases: //! 1. Parse platformio.ini @@ -357,7 +357,8 @@ impl BuildOrchestrator for Nrf52Orchestrator { TargetArchitecture::Arm, "NRF52", start, - ).await?; + ) + .await?; if build_result.success && !params.compiledb_only diff --git a/crates/fbuild-build/src/parallel.rs b/crates/fbuild-build/src/parallel.rs index 84ee9f14..33ebc091 100644 --- a/crates/fbuild-build/src/parallel.rs +++ b/crates/fbuild-build/src/parallel.rs @@ -125,8 +125,7 @@ pub async fn compile_sources_parallel( match compiler_ptr.compile(&source, &obj, &source_flags).await { Ok(result) if result.success => { let stderr = result.stderr.trim().to_string(); - let count = - counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + let count = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; if count % 20 == 0 || count == total { tracing::info!("[{}/{}] compiled", count, total); if let Some(log) = build_log_ptr { @@ -164,7 +163,9 @@ pub async fn compile_sources_parallel( } Err(join_err) => { if first_error.is_none() { - first_error = Some(format!("compile task panicked or was cancelled: {join_err}")); + first_error = Some(format!( + "compile task panicked or was cancelled: {join_err}" + )); tasks.abort_all(); } } diff --git a/crates/fbuild-build/src/renesas/orchestrator.rs b/crates/fbuild-build/src/renesas/orchestrator.rs index aa46922d..a5243ff1 100644 --- a/crates/fbuild-build/src/renesas/orchestrator.rs +++ b/crates/fbuild-build/src/renesas/orchestrator.rs @@ -1,4 +1,4 @@ -//! Renesas RA build orchestrator — wires together config, packages, compiler, linker. +//! Renesas RA build orchestrator — wires together config, packages, compiler, linker. //! //! Build phases: //! 1. Parse platformio.ini @@ -258,7 +258,8 @@ impl BuildOrchestrator for RenesasOrchestrator { TargetArchitecture::Arm, "Renesas RA", start, - ).await?; + ) + .await?; if build_result.success && !params.compiledb_only diff --git a/crates/fbuild-build/src/renesas/renesas_linker.rs b/crates/fbuild-build/src/renesas/renesas_linker.rs index b06892cb..0ae126c9 100644 --- a/crates/fbuild-build/src/renesas/renesas_linker.rs +++ b/crates/fbuild-build/src/renesas/renesas_linker.rs @@ -1,4 +1,4 @@ -//! Renesas RA ARM linker implementation. +//! Renesas RA ARM linker implementation. //! //! Links ARM Cortex-M4 object files into firmware.elf, converts to firmware.bin, //! and reports size using arm-none-eabi-size. @@ -57,8 +57,7 @@ impl RenesasLinker { #[async_trait::async_trait] impl Linker for RenesasLinker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-build/src/rp2040/orchestrator.rs b/crates/fbuild-build/src/rp2040/orchestrator.rs index 2a01c5ce..e91b035c 100644 --- a/crates/fbuild-build/src/rp2040/orchestrator.rs +++ b/crates/fbuild-build/src/rp2040/orchestrator.rs @@ -1,4 +1,4 @@ -//! RP2040/RP2350 build orchestrator — wires together config, packages, compiler, linker. +//! RP2040/RP2350 build orchestrator — wires together config, packages, compiler, linker. //! //! Build phases: //! 1. Parse platformio.ini @@ -324,7 +324,8 @@ impl BuildOrchestrator for Rp2040Orchestrator { TargetArchitecture::Arm, "RP2040", start, - ).await?; + ) + .await?; if build_result.success && !params.compiledb_only diff --git a/crates/fbuild-build/src/sam/orchestrator.rs b/crates/fbuild-build/src/sam/orchestrator.rs index 19f9926a..45b4046a 100644 --- a/crates/fbuild-build/src/sam/orchestrator.rs +++ b/crates/fbuild-build/src/sam/orchestrator.rs @@ -1,4 +1,4 @@ -//! SAM/SAMD build orchestrator — wires together config, packages, compiler, linker. +//! SAM/SAMD build orchestrator — wires together config, packages, compiler, linker. //! //! Handles both SAM (Due/SAM3X) and SAMD (SAMD21/SAMD51) boards under the //! `atmelsam` platform. Selects the correct Arduino core: @@ -278,7 +278,8 @@ impl BuildOrchestrator for SamOrchestrator { TargetArchitecture::Arm, "SAM", start, - ).await?; + ) + .await?; if build_result.success && !params.compiledb_only diff --git a/crates/fbuild-build/src/sam/sam_linker.rs b/crates/fbuild-build/src/sam/sam_linker.rs index 6f899b97..b70cbe58 100644 --- a/crates/fbuild-build/src/sam/sam_linker.rs +++ b/crates/fbuild-build/src/sam/sam_linker.rs @@ -1,4 +1,4 @@ -//! SAM ARM linker implementation. +//! SAM ARM linker implementation. //! //! Links ARM Cortex-M3 object files into firmware.elf, converts to firmware.bin, //! and reports size using arm-none-eabi-size. @@ -71,8 +71,7 @@ impl SamLinker { #[async_trait::async_trait] impl Linker for SamLinker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-build/src/script_runtime_tests.rs b/crates/fbuild-build/src/script_runtime_tests.rs index b19352cb..bfbf6147 100644 --- a/crates/fbuild-build/src/script_runtime_tests.rs +++ b/crates/fbuild-build/src/script_runtime_tests.rs @@ -1,4 +1,4 @@ -use super::*; +use super::*; use crate::flag_overlay::ScriptScopeState; use std::fs; @@ -30,7 +30,8 @@ extra_scripts = {} async fn resolve_runtime_error(project_dir: &Path) -> String { let config = fbuild_config::PlatformIOConfig::from_path(&project_dir.join("platformio.ini")).unwrap(); - resolve_extra_script_overlay(project_dir, "demo", &config).await + resolve_extra_script_overlay(project_dir, "demo", &config) + .await .unwrap_err() .to_string() } @@ -158,7 +159,9 @@ env.Append(CPPDEFINES=[\"DUMP_SHIM_OK\"]) let config = fbuild_config::PlatformIOConfig::from_path(&project_dir.join("platformio.ini")).unwrap(); // Pinned to MockEnv (see resolve_runtime_overlay note). - let overlay = resolve_extra_script_overlay(project_dir, "demo", &config).await.unwrap(); + let overlay = resolve_extra_script_overlay(project_dir, "demo", &config) + .await + .unwrap(); assert!(overlay .global_compile .common @@ -204,7 +207,9 @@ env.Append(CPPDEFINES=[\"HELPERS_SHIM_OK\"]) let config = fbuild_config::PlatformIOConfig::from_path(&project_dir.join("platformio.ini")).unwrap(); // Pinned to MockEnv (see resolve_runtime_overlay note). - let overlay = resolve_extra_script_overlay(project_dir, "demo", &config).await.unwrap(); + let overlay = resolve_extra_script_overlay(project_dir, "demo", &config) + .await + .unwrap(); assert!(overlay .global_compile .common @@ -247,7 +252,9 @@ env.Append(CPPDEFINES=[\"BOARD_CONFIG_SHIM_OK\"]) let config = fbuild_config::PlatformIOConfig::from_path(&project_dir.join("platformio.ini")).unwrap(); // Pinned to MockEnv (see resolve_runtime_overlay note). - let overlay = resolve_extra_script_overlay(project_dir, "demo", &config).await.unwrap(); + let overlay = resolve_extra_script_overlay(project_dir, "demo", &config) + .await + .unwrap(); assert!(overlay .global_compile .common @@ -293,7 +300,9 @@ env.Append(CPPDEFINES=[\"PIO_PLATFORM_SHIM_OK\"]) let config = fbuild_config::PlatformIOConfig::from_path(&project_dir.join("platformio.ini")).unwrap(); // Pinned to MockEnv (see resolve_runtime_overlay note). - let overlay = resolve_extra_script_overlay(project_dir, "demo", &config).await.unwrap(); + let overlay = resolve_extra_script_overlay(project_dir, "demo", &config) + .await + .unwrap(); assert!(overlay .global_compile .common diff --git a/crates/fbuild-build/src/silabs/silabs_linker.rs b/crates/fbuild-build/src/silabs/silabs_linker.rs index 9aa5937d..8fff2489 100644 --- a/crates/fbuild-build/src/silabs/silabs_linker.rs +++ b/crates/fbuild-build/src/silabs/silabs_linker.rs @@ -1,4 +1,4 @@ -//! Silicon Labs ARM linker implementation. +//! Silicon Labs ARM linker implementation. //! //! Links ARM Cortex-M33 object files into firmware.elf, converts to firmware.bin, //! and reports size using arm-none-eabi-size. @@ -63,8 +63,7 @@ impl SilabsLinker { #[async_trait::async_trait] impl Linker for SilabsLinker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-build/src/teensy/orchestrator.rs b/crates/fbuild-build/src/teensy/orchestrator.rs index 30f6a82d..e55345d1 100644 --- a/crates/fbuild-build/src/teensy/orchestrator.rs +++ b/crates/fbuild-build/src/teensy/orchestrator.rs @@ -1,4 +1,4 @@ -//! Teensy build orchestrator — wires together config, packages, compiler, linker. +//! Teensy build orchestrator — wires together config, packages, compiler, linker. //! //! Build phases: //! 1. Parse platformio.ini @@ -300,7 +300,8 @@ impl BuildOrchestrator for TeensyOrchestrator { TargetArchitecture::Arm, "Teensy", start, - ).await?; + ) + .await?; if build_result.success && !params.compiledb_only diff --git a/crates/fbuild-build/src/teensy/teensy_linker.rs b/crates/fbuild-build/src/teensy/teensy_linker.rs index 5bc67f72..a1cdcf3a 100644 --- a/crates/fbuild-build/src/teensy/teensy_linker.rs +++ b/crates/fbuild-build/src/teensy/teensy_linker.rs @@ -123,8 +123,7 @@ impl TeensyLinker { #[async_trait::async_trait] impl Linker for TeensyLinker { async fn archive(&self, objects: &[PathBuf], output: &Path) -> Result<()> { - crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar") - .await + crate::linker::LinkerBase::archive(&self.ar_path, objects, output, "arm-none-eabi-ar").await } async fn link( diff --git a/crates/fbuild-core/src/response_file.rs b/crates/fbuild-core/src/response_file.rs index c4b1ad12..7efb9f6b 100644 --- a/crates/fbuild-core/src/response_file.rs +++ b/crates/fbuild-core/src/response_file.rs @@ -333,8 +333,7 @@ mod tests { let flags = vec!["-O2".to_string(), "-DFOO=bar".to_string()]; let blocking = write_response_file_blocking(&flags, tmp.path(), "stable").unwrap(); let content = std::fs::read_to_string(&blocking).unwrap(); - let (expected_path, expected_content) = - render_response_file(&flags, tmp.path(), "stable"); + let (expected_path, expected_content) = render_response_file(&flags, tmp.path(), "stable"); assert_eq!(blocking, expected_path); assert_eq!(content, expected_content); } diff --git a/crates/fbuild-core/src/subprocess.rs b/crates/fbuild-core/src/subprocess.rs index dd0c1fb4..29a14f0c 100644 --- a/crates/fbuild-core/src/subprocess.rs +++ b/crates/fbuild-core/src/subprocess.rs @@ -99,7 +99,9 @@ pub async fn run_command( if args.is_empty() { return Err(FbuildError::Other("empty command".to_string())); } - let mut cmd = build_command(args, cwd, env, /*capture=*/ true, /*stdin_piped=*/ false)?; + let mut cmd = build_command( + args, cwd, env, /*capture=*/ true, /*stdin_piped=*/ false, + )?; let child = tokio_spawn::spawn_contained(&mut cmd).map_err(|e| spawn_err(args, e))?; wait_and_capture(child, args, timeout).await } @@ -122,7 +124,9 @@ pub async fn run_command_with_stdin( if args.is_empty() { return Err(FbuildError::Other("empty command".to_string())); } - let mut cmd = build_command(args, cwd, env, /*capture=*/ true, /*stdin_piped=*/ true)?; + let mut cmd = build_command( + args, cwd, env, /*capture=*/ true, /*stdin_piped=*/ true, + )?; let mut child = tokio_spawn::spawn_contained(&mut cmd).map_err(|e| spawn_err(args, e))?; // Take the stdin handle and concurrently write the payload while @@ -184,7 +188,9 @@ pub async fn run_command_passthrough( if args.is_empty() { return Err(FbuildError::Other("empty command".to_string())); } - let mut cmd = build_command(args, cwd, env, /*capture=*/ false, /*stdin_piped=*/ false)?; + let mut cmd = build_command( + args, cwd, env, /*capture=*/ false, /*stdin_piped=*/ false, + )?; let mut child = tokio_spawn::spawn_contained(&mut cmd).map_err(|e| spawn_err(args, e))?; let status = match wait_with_timeout(&mut child, timeout).await? { Some(status) => status, @@ -262,7 +268,9 @@ async fn wait_and_capture( let wait_fut = child.wait_with_output(); let output = match timeout { Some(d) => match tokio::time::timeout(d, wait_fut).await { - Ok(res) => res.map_err(|e| FbuildError::Other(format!("command {:?} failed: {}", args, e)))?, + Ok(res) => { + res.map_err(|e| FbuildError::Other(format!("command {:?} failed: {}", args, e)))? + } Err(_) => { return Err(FbuildError::Timeout(format!( "command timed out after {}s", diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index 24907895..91040f89 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -849,285 +849,5 @@ async fn handle_monitor_session_ws( } #[cfg(test)] -mod tests { - use super::*; - - #[test] - fn build_status_snapshot_produces_valid_json() { - let (tx, _rx) = tokio::sync::watch::channel(false); - let ctx = DaemonContext::new(8765, tx, "test".to_string()); - let snap = build_status_snapshot(&ctx); - let v: serde_json::Value = serde_json::from_str(&snap).unwrap(); - assert_eq!(v["type"], "status"); - assert_eq!(v["state"], "idle"); - assert!(!v["operation_in_progress"].as_bool().unwrap()); - } - - #[test] - fn now_unix_returns_reasonable_value() { - let ts = now_unix(); - // After 2020-01-01 - assert!(ts > 1_577_836_800.0); - } - - // --------------------------------------------------------------- - // ReaderControl + writer-batching topology tests (#757). - // - // These exercise the contracts of the post-#750 reader/writer/ - // inbound split WITHOUT needing axum's WebSocket harness or a - // real serial port. The actual reader / writer / inbound task - // bodies are spawned inside `handle_serial_ws()` and capture - // local closures, so we don't reach into them directly -- - // instead we exercise the *primitives* they rely on: - // - // - `ReaderControl::Drain` round-trips through an mpsc to a - // toy reader that drains a broadcast channel - // - `ReaderControl::GetDepth` round-trips and reports broadcast - // queue depth - // - Writer-style coalescing of adjacent `SerialServerMessage:: - // Data` messages into a single Data with merged `lines` - // - // The full spawn-topology integration test is deferred to a - // tests/serial_ws_burst.rs harness (separate sub-PR of #757) - // because axum's WebSocket testing requires standing up a real - // hyper server -- substantially more scaffolding than these - // primitive-level checks need. - // --------------------------------------------------------------- - - use tokio::sync::broadcast; - - /// Tiny model of the reader task: a single select between broadcast - /// recv and ReaderControl, exposing only the ReaderControl branch - /// so we can test it in isolation. NOT the production code path -- - /// the production reader is in `handle_serial_ws()` inline. This - /// mirrors its ReaderControl handling so the contract is exercised. - async fn run_toy_reader( - mut rx: broadcast::Receiver, - mut control_rx: mpsc::UnboundedReceiver, - ) { - loop { - tokio::select! { - biased; - broadcast_result = rx.recv() => match broadcast_result { - Ok(_) => {} // drop; we only care about the ReaderControl branch here - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - }, - control_opt = control_rx.recv() => { - let Some(cmd) = control_opt else { break }; - match cmd { - ReaderControl::Drain { reply } => { - let mut drained: usize = 0; - while rx.try_recv().is_ok() { - drained += 1; - } - let _ = reply.send(drained); - } - ReaderControl::GetDepth { reply } => { - let _ = reply.send(rx.len()); - } - } - } - } - } - } - - #[tokio::test] - async fn reader_control_drain_reports_drop_count() { - let (bcast_tx, bcast_rx) = broadcast::channel::(16); - let (ctl_tx, ctl_rx) = mpsc::unbounded_channel(); - let reader = tokio::spawn(run_toy_reader(bcast_rx, ctl_rx)); - - // Push 5 events, do NOT let the reader drain them via its - // main `rx.recv()` (the toy reader hits select between - // broadcast and control; with biased priority the broadcast - // wins, so we need to send Drain BEFORE the reader awakes). - // - // Workaround: bound how many events are pushed before Drain - // by sending the events synchronously, then immediately - // sending Drain. Tokio scheduling means the toy reader will - // see both branches ready; biased order makes it serve the - // broadcast first, draining N-1 (it consumes one per loop - // iteration). Then the next iteration sees Drain and gets - // whatever is left. This proves the drain count IS the - // residue after natural consumption — close enough for the - // contract. - - for i in 0..5u32 { - bcast_tx.send(i).unwrap(); - } - // Tiny yield so the reader sees the broadcast queue, then we - // race a Drain in before all events are consumed. - let (reply_tx, reply_rx) = oneshot::channel(); - ctl_tx - .send(ReaderControl::Drain { reply: reply_tx }) - .unwrap(); - let drained = reply_rx.await.expect("reader replied"); - // At least one event present at the drain point. The exact - // count is timing-dependent on the select scheduler; the - // contract we're proving is `replied with a real count`, - // not a specific number. - assert!( - drained <= 5, - "drain reported {drained} but only 5 events were ever sent" - ); - - drop(bcast_tx); // close broadcast so reader exits cleanly - drop(ctl_tx); - let _ = reader.await; - } - - #[tokio::test] - async fn reader_control_get_depth_reports_broadcast_length() { - let (bcast_tx, bcast_rx) = broadcast::channel::(16); - let (ctl_tx, ctl_rx) = mpsc::unbounded_channel(); - let reader = tokio::spawn(run_toy_reader(bcast_rx, ctl_rx)); - - for i in 0..3u32 { - bcast_tx.send(i).unwrap(); - } - let (reply_tx, reply_rx) = oneshot::channel(); - ctl_tx - .send(ReaderControl::GetDepth { reply: reply_tx }) - .unwrap(); - let depth = reply_rx.await.expect("reader replied"); - // Same timing caveat as above — the reader may have consumed - // some entries between push and reply. Contract: the reply - // IS a count ≤ what we sent. - assert!(depth <= 3, "depth reported {depth} but only 3 sent"); - - drop(bcast_tx); - drop(ctl_tx); - let _ = reader.await; - } - - /// Models the writer task's batching/coalescing logic in isolation. - /// Production version lives inline in `handle_serial_ws()`. This - /// proves the contract: adjacent Data messages merge their `lines` - /// into a single output Data; non-Data messages preserve ordering - /// by flushing the current Data batch first. - fn coalesce_for_test(input: Vec) -> Vec { - let mut output = Vec::new(); - let mut data_batch: Vec = Vec::new(); - let mut last_index: u64 = 0; - for msg in input { - match msg { - SerialServerMessage::Data { - lines, - current_index, - } => { - data_batch.extend(lines); - last_index = current_index; - } - other => { - if !data_batch.is_empty() { - output.push(SerialServerMessage::Data { - lines: std::mem::take(&mut data_batch), - current_index: last_index, - }); - } - output.push(other); - } - } - } - if !data_batch.is_empty() { - output.push(SerialServerMessage::Data { - lines: data_batch, - current_index: last_index, - }); - } - output - } - - #[test] - fn writer_coalesces_adjacent_data_into_one_frame() { - // 5 single-line Data messages -> 1 Data with 5 lines. - let input = vec![ - SerialServerMessage::Data { - lines: vec!["a".into()], - current_index: 1, - }, - SerialServerMessage::Data { - lines: vec!["b".into()], - current_index: 2, - }, - SerialServerMessage::Data { - lines: vec!["c".into()], - current_index: 3, - }, - SerialServerMessage::Data { - lines: vec!["d".into()], - current_index: 4, - }, - SerialServerMessage::Data { - lines: vec!["e".into()], - current_index: 5, - }, - ]; - let output = coalesce_for_test(input); - assert_eq!(output.len(), 1, "should coalesce to 1 frame"); - match &output[0] { - SerialServerMessage::Data { - lines, - current_index, - } => { - assert_eq!(lines, &vec!["a", "b", "c", "d", "e"]); - assert_eq!(*current_index, 5); - } - other => panic!("expected Data, got {:?}", other), - } - } - - #[test] - fn writer_flushes_data_before_non_data_event() { - // Data, Data, PortDisconnected, Data - // -> Data{lines:[a,b]}, PortDisconnected, Data{lines:[c]} - let input = vec![ - SerialServerMessage::Data { - lines: vec!["a".into()], - current_index: 1, - }, - SerialServerMessage::Data { - lines: vec!["b".into()], - current_index: 2, - }, - SerialServerMessage::PortDisconnected { - port: "COM3".into(), - reason: "unplugged".into(), - message: "".into(), - }, - SerialServerMessage::Data { - lines: vec!["c".into()], - current_index: 3, - }, - ]; - let output = coalesce_for_test(input); - assert_eq!(output.len(), 3, "expected 3 output frames"); - match &output[0] { - SerialServerMessage::Data { - lines, - current_index, - } => { - assert_eq!(lines, &vec!["a", "b"]); - assert_eq!(*current_index, 2); - } - other => panic!("output[0] expected Data, got {:?}", other), - } - match &output[1] { - SerialServerMessage::PortDisconnected { port, .. } => { - assert_eq!(port, "COM3"); - } - other => panic!("output[1] expected PortDisconnected, got {:?}", other), - } - match &output[2] { - SerialServerMessage::Data { - lines, - current_index, - } => { - assert_eq!(lines, &vec!["c"]); - assert_eq!(*current_index, 3); - } - other => panic!("output[2] expected Data, got {:?}", other), - } - } -} +#[path = "websockets_tests.rs"] +mod tests; diff --git a/crates/fbuild-daemon/src/handlers/websockets_tests.rs b/crates/fbuild-daemon/src/handlers/websockets_tests.rs new file mode 100644 index 00000000..654d7af6 --- /dev/null +++ b/crates/fbuild-daemon/src/handlers/websockets_tests.rs @@ -0,0 +1,280 @@ +use super::*; + +#[test] +fn build_status_snapshot_produces_valid_json() { + let (tx, _rx) = tokio::sync::watch::channel(false); + let ctx = DaemonContext::new(8765, tx, "test".to_string()); + let snap = build_status_snapshot(&ctx); + let v: serde_json::Value = serde_json::from_str(&snap).unwrap(); + assert_eq!(v["type"], "status"); + assert_eq!(v["state"], "idle"); + assert!(!v["operation_in_progress"].as_bool().unwrap()); +} + +#[test] +fn now_unix_returns_reasonable_value() { + let ts = now_unix(); + // After 2020-01-01 + assert!(ts > 1_577_836_800.0); +} + +// --------------------------------------------------------------- +// ReaderControl + writer-batching topology tests (#757). +// +// These exercise the contracts of the post-#750 reader/writer/ +// inbound split WITHOUT needing axum's WebSocket harness or a +// real serial port. The actual reader / writer / inbound task +// bodies are spawned inside `handle_serial_ws()` and capture +// local closures, so we don't reach into them directly -- +// instead we exercise the *primitives* they rely on: +// +// - `ReaderControl::Drain` round-trips through an mpsc to a +// toy reader that drains a broadcast channel +// - `ReaderControl::GetDepth` round-trips and reports broadcast +// queue depth +// - Writer-style coalescing of adjacent `SerialServerMessage:: +// Data` messages into a single Data with merged `lines` +// +// The full spawn-topology integration test is deferred to a +// tests/serial_ws_burst.rs harness (separate sub-PR of #757) +// because axum's WebSocket testing requires standing up a real +// hyper server -- substantially more scaffolding than these +// primitive-level checks need. +// --------------------------------------------------------------- + +use tokio::sync::broadcast; + +/// Tiny model of the reader task: a single select between broadcast +/// recv and ReaderControl, exposing only the ReaderControl branch +/// so we can test it in isolation. NOT the production code path -- +/// the production reader is in `handle_serial_ws()` inline. This +/// mirrors its ReaderControl handling so the contract is exercised. +async fn run_toy_reader( + mut rx: broadcast::Receiver, + mut control_rx: mpsc::UnboundedReceiver, +) { + loop { + tokio::select! { + biased; + broadcast_result = rx.recv() => match broadcast_result { + Ok(_) => {} // drop; we only care about the ReaderControl branch here + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + }, + control_opt = control_rx.recv() => { + let Some(cmd) = control_opt else { break }; + match cmd { + ReaderControl::Drain { reply } => { + let mut drained: usize = 0; + while rx.try_recv().is_ok() { + drained += 1; + } + let _ = reply.send(drained); + } + ReaderControl::GetDepth { reply } => { + let _ = reply.send(rx.len()); + } + } + } + } + } +} + +#[tokio::test] +async fn reader_control_drain_reports_drop_count() { + let (bcast_tx, bcast_rx) = broadcast::channel::(16); + let (ctl_tx, ctl_rx) = mpsc::unbounded_channel(); + let reader = tokio::spawn(run_toy_reader(bcast_rx, ctl_rx)); + + // Push 5 events, do NOT let the reader drain them via its + // main `rx.recv()` (the toy reader hits select between + // broadcast and control; with biased priority the broadcast + // wins, so we need to send Drain BEFORE the reader awakes). + // + // Workaround: bound how many events are pushed before Drain + // by sending the events synchronously, then immediately + // sending Drain. Tokio scheduling means the toy reader will + // see both branches ready; biased order makes it serve the + // broadcast first, draining N-1 (it consumes one per loop + // iteration). Then the next iteration sees Drain and gets + // whatever is left. This proves the drain count IS the + // residue after natural consumption — close enough for the + // contract. + + for i in 0..5u32 { + bcast_tx.send(i).unwrap(); + } + // Tiny yield so the reader sees the broadcast queue, then we + // race a Drain in before all events are consumed. + let (reply_tx, reply_rx) = oneshot::channel(); + ctl_tx + .send(ReaderControl::Drain { reply: reply_tx }) + .unwrap(); + let drained = reply_rx.await.expect("reader replied"); + // At least one event present at the drain point. The exact + // count is timing-dependent on the select scheduler; the + // contract we're proving is `replied with a real count`, + // not a specific number. + assert!( + drained <= 5, + "drain reported {drained} but only 5 events were ever sent" + ); + + drop(bcast_tx); // close broadcast so reader exits cleanly + drop(ctl_tx); + let _ = reader.await; +} + +#[tokio::test] +async fn reader_control_get_depth_reports_broadcast_length() { + let (bcast_tx, bcast_rx) = broadcast::channel::(16); + let (ctl_tx, ctl_rx) = mpsc::unbounded_channel(); + let reader = tokio::spawn(run_toy_reader(bcast_rx, ctl_rx)); + + for i in 0..3u32 { + bcast_tx.send(i).unwrap(); + } + let (reply_tx, reply_rx) = oneshot::channel(); + ctl_tx + .send(ReaderControl::GetDepth { reply: reply_tx }) + .unwrap(); + let depth = reply_rx.await.expect("reader replied"); + // Same timing caveat as above — the reader may have consumed + // some entries between push and reply. Contract: the reply + // IS a count ≤ what we sent. + assert!(depth <= 3, "depth reported {depth} but only 3 sent"); + + drop(bcast_tx); + drop(ctl_tx); + let _ = reader.await; +} + +/// Models the writer task's batching/coalescing logic in isolation. +/// Production version lives inline in `handle_serial_ws()`. This +/// proves the contract: adjacent Data messages merge their `lines` +/// into a single output Data; non-Data messages preserve ordering +/// by flushing the current Data batch first. +fn coalesce_for_test(input: Vec) -> Vec { + let mut output = Vec::new(); + let mut data_batch: Vec = Vec::new(); + let mut last_index: u64 = 0; + for msg in input { + match msg { + SerialServerMessage::Data { + lines, + current_index, + } => { + data_batch.extend(lines); + last_index = current_index; + } + other => { + if !data_batch.is_empty() { + output.push(SerialServerMessage::Data { + lines: std::mem::take(&mut data_batch), + current_index: last_index, + }); + } + output.push(other); + } + } + } + if !data_batch.is_empty() { + output.push(SerialServerMessage::Data { + lines: data_batch, + current_index: last_index, + }); + } + output +} + +#[test] +fn writer_coalesces_adjacent_data_into_one_frame() { + // 5 single-line Data messages -> 1 Data with 5 lines. + let input = vec![ + SerialServerMessage::Data { + lines: vec!["a".into()], + current_index: 1, + }, + SerialServerMessage::Data { + lines: vec!["b".into()], + current_index: 2, + }, + SerialServerMessage::Data { + lines: vec!["c".into()], + current_index: 3, + }, + SerialServerMessage::Data { + lines: vec!["d".into()], + current_index: 4, + }, + SerialServerMessage::Data { + lines: vec!["e".into()], + current_index: 5, + }, + ]; + let output = coalesce_for_test(input); + assert_eq!(output.len(), 1, "should coalesce to 1 frame"); + match &output[0] { + SerialServerMessage::Data { + lines, + current_index, + } => { + assert_eq!(lines, &vec!["a", "b", "c", "d", "e"]); + assert_eq!(*current_index, 5); + } + other => panic!("expected Data, got {:?}", other), + } +} + +#[test] +fn writer_flushes_data_before_non_data_event() { + // Data, Data, PortDisconnected, Data + // -> Data{lines:[a,b]}, PortDisconnected, Data{lines:[c]} + let input = vec![ + SerialServerMessage::Data { + lines: vec!["a".into()], + current_index: 1, + }, + SerialServerMessage::Data { + lines: vec!["b".into()], + current_index: 2, + }, + SerialServerMessage::PortDisconnected { + port: "COM3".into(), + reason: "unplugged".into(), + message: "".into(), + }, + SerialServerMessage::Data { + lines: vec!["c".into()], + current_index: 3, + }, + ]; + let output = coalesce_for_test(input); + assert_eq!(output.len(), 3, "expected 3 output frames"); + match &output[0] { + SerialServerMessage::Data { + lines, + current_index, + } => { + assert_eq!(lines, &vec!["a", "b"]); + assert_eq!(*current_index, 2); + } + other => panic!("output[0] expected Data, got {:?}", other), + } + match &output[1] { + SerialServerMessage::PortDisconnected { port, .. } => { + assert_eq!(port, "COM3"); + } + other => panic!("output[1] expected PortDisconnected, got {:?}", other), + } + match &output[2] { + SerialServerMessage::Data { + lines, + current_index, + } => { + assert_eq!(lines, &vec!["c"]); + assert_eq!(*current_index, 3); + } + other => panic!("output[2] expected Data, got {:?}", other), + } +} diff --git a/crates/fbuild-deploy/src/esp32/deployer.rs b/crates/fbuild-deploy/src/esp32/deployer.rs index d9da63ac..153e0517 100644 --- a/crates/fbuild-deploy/src/esp32/deployer.rs +++ b/crates/fbuild-deploy/src/esp32/deployer.rs @@ -307,7 +307,8 @@ impl Esp32Deployer { } } - self.try_verify_deployment_esptool(firmware_path, port).await + self.try_verify_deployment_esptool(firmware_path, port) + .await } async fn try_verify_deployment_esptool( @@ -694,7 +695,9 @@ impl Esp32Deployer { #[cfg(feature = "espflash-native")] if self.use_native_write { - let native = self.try_deploy_regions_native(firmware_path, port, regions).await; + let native = self + .try_deploy_regions_native(firmware_path, port, regions) + .await; if let Some(result) = native_write_or_fallback_outcome(port, "selective write-flash", native) { diff --git a/crates/fbuild-deploy/src/lib.rs b/crates/fbuild-deploy/src/lib.rs index 70804461..b34d2cfb 100644 --- a/crates/fbuild-deploy/src/lib.rs +++ b/crates/fbuild-deploy/src/lib.rs @@ -297,7 +297,9 @@ mod post_deploy_recovery_tests { deploy_calls: Arc::new(AtomicUsize::new(0)), }; let start = std::time::Instant::now(); - let result = dep.post_deploy_recovery("a-port-that-does-not-exist-zzz").await; + let result = dep + .post_deploy_recovery("a-port-that-does-not-exist-zzz") + .await; let elapsed = start.elapsed(); assert!(result.is_ok(), "default impl returns Ok even on timeout"); assert!( diff --git a/crates/fbuild-deploy/src/teensy/mod.rs b/crates/fbuild-deploy/src/teensy/mod.rs index 5ef913e7..6574a084 100644 --- a/crates/fbuild-deploy/src/teensy/mod.rs +++ b/crates/fbuild-deploy/src/teensy/mod.rs @@ -286,12 +286,11 @@ impl Deployer for TeensyDeployer { // up-to-5s discovery window. let discovery_pre = pre_snapshot.clone(); let discovery_window = Duration::from_secs(self.post_flash_port_discovery_secs); - let discovery_outcome = - tokio::task::spawn_blocking(move || { - port_discovery::wait_for_new_cdc_port(&discovery_pre, discovery_window) - }) - .await - .unwrap_or(port_discovery::NewPortOutcome::TimedOut); + let discovery_outcome = tokio::task::spawn_blocking(move || { + port_discovery::wait_for_new_cdc_port(&discovery_pre, discovery_window) + }) + .await + .unwrap_or(port_discovery::NewPortOutcome::TimedOut); let new_port = match discovery_outcome { port_discovery::NewPortOutcome::Found(name) => Some(name), port_discovery::NewPortOutcome::TimedOut => { diff --git a/crates/fbuild-packages/src/library/arduino_api.rs b/crates/fbuild-packages/src/library/arduino_api.rs index abe10b8e..c82e28f7 100644 --- a/crates/fbuild-packages/src/library/arduino_api.rs +++ b/crates/fbuild-packages/src/library/arduino_api.rs @@ -50,9 +50,16 @@ pub async fn ensure_arduino_api(core_dir: &Path) -> Result<()> { })?; // Async HTTP via the shared client (FastLED/fbuild#813). - let response = http::client().get(ARDUINO_API_URL).send().await.map_err(|e| { - fbuild_core::FbuildError::PackageError(format!("failed to download ArduinoCore-API: {}", e)) - })?; + let response = http::client() + .get(ARDUINO_API_URL) + .send() + .await + .map_err(|e| { + fbuild_core::FbuildError::PackageError(format!( + "failed to download ArduinoCore-API: {}", + e + )) + })?; if !response.status().is_success() { return Err(fbuild_core::FbuildError::PackageError(format!( diff --git a/crates/fbuild-serial/src/crash_decoder.rs b/crates/fbuild-serial/src/crash_decoder.rs index 501338a9..93890691 100644 --- a/crates/fbuild-serial/src/crash_decoder.rs +++ b/crates/fbuild-serial/src/crash_decoder.rs @@ -554,7 +554,9 @@ mod tests { // Middle lines decoder.process_line("Core 0 register dump:").await; - decoder.process_line("Backtrace: 0x42002a3c:0x3fc90000").await; + decoder + .process_line("Backtrace: 0x42002a3c:0x3fc90000") + .await; // End — should produce output (warning about no elf) let result = decoder.process_line("Rebooting...").await; diff --git a/crates/fbuild-serial/src/manager.rs b/crates/fbuild-serial/src/manager.rs index a17c3375..c84b38dd 100644 --- a/crates/fbuild-serial/src/manager.rs +++ b/crates/fbuild-serial/src/manager.rs @@ -1,22 +1,8 @@ -//! SharedSerialManager: centralized serial port access. -//! -//! This is the Rust equivalent of Python's SharedSerialManager (1170 lines). -//! All serial I/O flows through this single manager in the daemon. -//! -//! ## Concurrency Model -//! -//! - Per-port state protected by tokio::sync::Mutex -//! - Background reader task per open port (tokio::spawn) -//! - Broadcast channel for output distribution to readers -//! - Exclusive writer access via condition variable pattern -//! -//! ## Windows USB-CDC Strategy (v5) -//! -//! 1. Drain input buffer aggressively (1 second initial) -//! 2. Per-attempt: drain input buffer before each write -//! 3. 50ms per-attempt timeout (many rapid attempts) -//! 4. 200 max attempts in 20 seconds -//! 5. Toggle DTR/RTS for flow control +//! SharedSerialManager: centralized serial port access. All serial I/O +//! flows through this single manager in the daemon. See +//! `docs/architecture/serial.md` for the concurrency model +//! (per-port `tokio::sync::Mutex`, per-port reader task, broadcast for +//! readers, exclusive writer) and the Windows USB-CDC write strategy. use crate::crash_decoder::CrashDecoder; use crate::messages::SerialStreamEvent;