From e102ac11d76b87cb869eab1335bc67a42e11ee1a Mon Sep 17 00:00:00 2001 From: zackees Date: Thu, 18 Jun 2026 16:37:35 -0700 Subject: [PATCH] Rebind serial sessions after USB renumber --- crates/fbuild-daemon/src/context.rs | 53 +- crates/fbuild-daemon/src/handlers/devices.rs | 8 +- .../src/handlers/operations/deploy.rs | 5 +- .../src/handlers/operations/monitor.rs | 10 + .../fbuild-daemon/src/handlers/websockets.rs | 11 + crates/fbuild-python/src/json_rpc.rs | 1 + crates/fbuild-python/src/messages.rs | 10 + crates/fbuild-python/src/serial_monitor.rs | 2 + crates/fbuild-serial/src/manager.rs | 550 +++++++++++++++++- crates/fbuild-serial/src/messages.rs | 39 ++ 10 files changed, 642 insertions(+), 47 deletions(-) diff --git a/crates/fbuild-daemon/src/context.rs b/crates/fbuild-daemon/src/context.rs index 06b312fc..e6b001c4 100644 --- a/crates/fbuild-daemon/src/context.rs +++ b/crates/fbuild-daemon/src/context.rs @@ -378,27 +378,58 @@ impl DaemonContext { .clone() } - pub fn refresh_devices_and_broadcast_serial_moves(&self) { + pub async fn refresh_devices_and_broadcast_serial_moves(&self) { self.device_manager.refresh_devices(); - self.broadcast_recent_device_port_moves(); + self.rebind_recent_device_port_moves().await; } - pub fn refresh_devices_if_stale_and_broadcast_serial_moves(&self, max_age: Duration) -> bool { + pub async fn refresh_devices_if_stale_and_broadcast_serial_moves( + &self, + max_age: Duration, + ) -> bool { let refreshed = self.device_manager.refresh_devices_if_stale(max_age); if refreshed { - self.broadcast_recent_device_port_moves(); + self.rebind_recent_device_port_moves().await; } refreshed } - fn broadcast_recent_device_port_moves(&self) { + async fn rebind_recent_device_port_moves(&self) { for move_event in self.device_manager.take_recent_port_moves() { - self.serial_manager.notify_port_renumbered( - &move_event.previous_port, - &move_event.port, - "tracked_serial_move", - move_event.serial_number, - ); + match self + .serial_manager + .rebind_port_session( + &move_event.previous_port, + &move_event.port, + "tracked_serial_move", + move_event.serial_number.clone(), + ) + .await + { + Ok(true) => {} + Ok(false) => { + self.serial_manager.notify_port_renumbered( + &move_event.previous_port, + &move_event.port, + "tracked_serial_move", + move_event.serial_number, + ); + } + Err(err) => { + self.serial_manager.notify_port_rebind_failed( + &move_event.previous_port, + &move_event.port, + "open_failed", + err.to_string(), + ); + tracing::warn!( + previous_port = move_event.previous_port, + port = move_event.port, + "failed to rebind serial session after tracked port move: {}", + err + ); + } + } } } } diff --git a/crates/fbuild-daemon/src/handlers/devices.rs b/crates/fbuild-daemon/src/handlers/devices.rs index 308c81a8..5974cfec 100644 --- a/crates/fbuild-daemon/src/handlers/devices.rs +++ b/crates/fbuild-daemon/src/handlers/devices.rs @@ -15,7 +15,7 @@ use uuid::Uuid; /// POST /api/devices/list pub async fn list_devices(state: State>) -> Json { // Refresh device inventory - state.refresh_devices_and_broadcast_serial_moves(); + state.refresh_devices_and_broadcast_serial_moves().await; let all = state.device_manager.get_all_devices(); let devices = all.values().map(device_info).collect(); @@ -32,7 +32,7 @@ pub async fn device_status( Path(port): Path, ) -> Json { // Refresh to get latest state - state.refresh_devices_and_broadcast_serial_moves(); + state.refresh_devices_and_broadcast_serial_moves().await; match state.device_manager.get_device_status(&port) { Some(ds) => Json(device_status_response(ds)), @@ -99,7 +99,7 @@ pub async fn device_lease( let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string()); // Ensure devices are refreshed - state.refresh_devices_and_broadcast_serial_moves(); + state.refresh_devices_and_broadcast_serial_moves().await; let result = match req.lease_type.as_str() { "exclusive" => state.device_manager.acquire_exclusive( @@ -178,7 +178,7 @@ pub async fn device_preempt( let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string()); // Refresh first - state.refresh_devices_and_broadcast_serial_moves(); + state.refresh_devices_and_broadcast_serial_moves().await; match state .device_manager diff --git a/crates/fbuild-daemon/src/handlers/operations/deploy.rs b/crates/fbuild-daemon/src/handlers/operations/deploy.rs index 6fbc5c90..17b6af3e 100644 --- a/crates/fbuild-daemon/src/handlers/operations/deploy.rs +++ b/crates/fbuild-daemon/src/handlers/operations/deploy.rs @@ -364,7 +364,7 @@ pub async fn deploy( } let deploy_port_choice = if req.port.is_none() { - ctx.refresh_devices_and_broadcast_serial_moves(); + ctx.refresh_devices_and_broadcast_serial_moves().await; choose_deploy_port( None, platform, @@ -412,7 +412,8 @@ pub async fn deploy( // the trust-check still requires `is_connected == true` on the // cached DeviceState, which the most-recent refresh supplied. if trusted_hash_enabled { - ctx.refresh_devices_if_stale_and_broadcast_serial_moves(std::time::Duration::from_secs(2)); + ctx.refresh_devices_if_stale_and_broadcast_serial_moves(std::time::Duration::from_secs(2)) + .await; } let deploy_result = tokio::task::spawn_blocking(move || -> fbuild_core::Result<(Option>, fbuild_deploy::DeploymentResult)> { // Populated by the Espressif32 arm with (image_hash, port). diff --git a/crates/fbuild-daemon/src/handlers/operations/monitor.rs b/crates/fbuild-daemon/src/handlers/operations/monitor.rs index 3fdef8a4..d1a82825 100644 --- a/crates/fbuild-daemon/src/handlers/operations/monitor.rs +++ b/crates/fbuild-daemon/src/handlers/operations/monitor.rs @@ -203,6 +203,16 @@ pub(crate) async fn run_monitor_loop( } Ok(Ok(SerialStreamEvent::PortRenumbered { .. })) | Ok(Ok(SerialStreamEvent::PortReattached { .. })) => {} + Ok(Ok(SerialStreamEvent::PortRebindFailed { + port, + new_port, + reason, + message, + })) => { + return MonitorOutcome::Error(format!( + "serial port {port} failed to rebind to {new_port} ({reason}): {message}" + )); + } Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => { tracing::warn!("monitor lagged, skipped {} messages", n); } diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index 1289641e..16bdc1a0 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -235,6 +235,17 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { break; } } + Ok(SerialStreamEvent::PortRebindFailed { port, new_port, reason, message }) => { + let msg = SerialServerMessage::PortRebindFailed { + port, + new_port, + reason, + message, + }; + if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { + break; + } + } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(client_id, port, n, "reader lagged, skipping lines"); } diff --git a/crates/fbuild-python/src/json_rpc.rs b/crates/fbuild-python/src/json_rpc.rs index 28a65e18..34f348cb 100644 --- a/crates/fbuild-python/src/json_rpc.rs +++ b/crates/fbuild-python/src/json_rpc.rs @@ -82,6 +82,7 @@ pub(crate) async fn read_lines_async( Ok(ServerMessage::Reconnected { .. }) => continue, Ok(ServerMessage::PortRenumbered { .. }) | Ok(ServerMessage::PortReattached { .. }) => continue, + Ok(ServerMessage::PortRebindFailed { .. }) => break, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } diff --git a/crates/fbuild-python/src/messages.rs b/crates/fbuild-python/src/messages.rs index fbceba4e..e2b1edd9 100644 --- a/crates/fbuild-python/src/messages.rs +++ b/crates/fbuild-python/src/messages.rs @@ -72,6 +72,16 @@ pub(crate) enum ServerMessage { #[allow(dead_code)] previous_port: String, }, + PortRebindFailed { + #[allow(dead_code)] + port: String, + #[allow(dead_code)] + new_port: String, + #[allow(dead_code)] + reason: String, + #[allow(dead_code)] + message: String, + }, Error { message: String, }, diff --git a/crates/fbuild-python/src/serial_monitor.rs b/crates/fbuild-python/src/serial_monitor.rs index 5391bf8a..4d9775e8 100644 --- a/crates/fbuild-python/src/serial_monitor.rs +++ b/crates/fbuild-python/src/serial_monitor.rs @@ -215,6 +215,7 @@ impl SerialMonitor { } Ok(ServerMessage::PortRenumbered { .. }) | Ok(ServerMessage::PortReattached { .. }) => continue, + Ok(ServerMessage::PortRebindFailed { .. }) => break, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } @@ -559,6 +560,7 @@ impl SerialMonitor { } Ok(ServerMessage::PortRenumbered { .. }) | Ok(ServerMessage::PortReattached { .. }) => continue, + Ok(ServerMessage::PortRebindFailed { .. }) => break, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } diff --git a/crates/fbuild-serial/src/manager.rs b/crates/fbuild-serial/src/manager.rs index 8afc29ea..3eb6c06c 100644 --- a/crates/fbuild-serial/src/manager.rs +++ b/crates/fbuild-serial/src/manager.rs @@ -43,6 +43,9 @@ struct PortOutputBuffer { /// Central serial port manager. One instance per daemon. pub struct SharedSerialManager { sessions: DashMap, + /// Alias from an OS port observed after USB renumbering back to the + /// logical session key that existing clients attached to. + port_aliases: DashMap, /// Broadcast channels per port for output distribution. broadcasters: DashMap>, /// Monotonic per-port generation that invalidates delayed physical closes. @@ -58,6 +61,7 @@ impl SharedSerialManager { pub fn new() -> Self { Self { sessions: DashMap::new(), + port_aliases: DashMap::new(), broadcasters: DashMap::new(), close_generations: DashMap::new(), preemption: Arc::new(PreemptionTracker::new()), @@ -73,10 +77,11 @@ impl SharedSerialManager { baud_rate: u32, client_id: &str, ) -> fbuild_core::Result<()> { + let session_key = self.resolve_port_key(port); // If already open, just return Ok - if let Some(session) = self.sessions.get(port) { + if let Some(session) = self.sessions.get(&session_key) { if session.is_open { - self.bump_close_generation(port); + self.bump_close_generation(&session_key); tracing::info!(port, client_id, "port already open, reusing"); return Ok(()); } @@ -284,9 +289,10 @@ impl SharedSerialManager { data: &[u8], client_id: &str, ) -> fbuild_core::Result { + let session_key = self.resolve_port_key(port); // Verify caller holds writer lock let handle = { - let session = self.sessions.get(port).ok_or_else(|| { + let session = self.sessions.get(&session_key).ok_or_else(|| { fbuild_core::FbuildError::SerialError(format!("port {} not open", port)) })?; if session.writer_client_id.as_deref() != Some(client_id) { @@ -318,7 +324,7 @@ impl SharedSerialManager { drop(serial); // Update stats - if let Some(mut session) = self.sessions.get_mut(port) { + if let Some(mut session) = self.sessions.get_mut(&session_key) { session.total_bytes_written += bytes_written as u64; } @@ -344,8 +350,9 @@ impl SharedSerialManager { /// /// See FastLED/fbuild#532 (auto-recover-from-download-mode path). pub async fn esp_hard_reset(&self, port: &str, client_id: &str) -> fbuild_core::Result<()> { + let session_key = self.resolve_port_key(port); let handle = { - let session = self.sessions.get(port).ok_or_else(|| { + let session = self.sessions.get(&session_key).ok_or_else(|| { fbuild_core::FbuildError::SerialError(format!("port {} not open", port)) })?; if session.owner_client_id.as_deref() != Some(client_id) { @@ -393,8 +400,9 @@ impl SharedSerialManager { /// Close a serial port. pub async fn close_port(&self, port: &str, client_id: &str) -> fbuild_core::Result<()> { - self.bump_close_generation(port); - if let Some((_, mut session)) = self.sessions.remove(port) { + let session_key = self.resolve_port_key(port); + self.bump_close_generation(&session_key); + if let Some((_, mut session)) = self.sessions.remove(&session_key) { // Signal the background reader to stop session.stop_flag.store(true, Ordering::Relaxed); @@ -407,9 +415,10 @@ impl SharedSerialManager { session.serial_handle = None; session.is_open = false; } - self.broadcasters.remove(port); - self.output_buffers.remove(port); - self.close_generations.remove(port); + self.broadcasters.remove(&session_key); + self.output_buffers.remove(&session_key); + self.close_generations.remove(&session_key); + self.remove_aliases_for_session(&session_key); tracing::info!(port, client_id, "port closed"); Ok(()) } @@ -427,13 +436,14 @@ impl SharedSerialManager { client_id: &str, grace: Duration, ) -> bool { - if self.has_clients(port) || !self.sessions.contains_key(port) { + let session_key = self.resolve_port_key(port); + if self.has_clients(&session_key) || !self.sessions.contains_key(&session_key) { return false; } - let generation = self.bump_close_generation(port); + let generation = self.bump_close_generation(&session_key); let manager = Arc::clone(self); - let port = port.to_string(); + let port = session_key; let client_id = client_id.to_string(); tokio::spawn(async move { tracing::debug!( @@ -470,18 +480,23 @@ impl SharedSerialManager { port: &str, client_id: &str, ) -> Option> { - let rx = self.broadcasters.get(port).map(|tx| tx.subscribe())?; - if let Some(mut session) = self.sessions.get_mut(port) { + let session_key = self.resolve_port_key(port); + let rx = self + .broadcasters + .get(&session_key) + .map(|tx| tx.subscribe())?; + if let Some(mut session) = self.sessions.get_mut(&session_key) { session.reader_client_ids.insert(client_id.to_string()); drop(session); - self.bump_close_generation(port); + self.bump_close_generation(&session_key); } Some(rx) } /// Detach a reader. pub fn detach_reader(&self, port: &str, client_id: &str) { - if let Some(mut session) = self.sessions.get_mut(port) { + let session_key = self.resolve_port_key(port); + if let Some(mut session) = self.sessions.get_mut(&session_key) { session.reader_client_ids.remove(client_id); } } @@ -495,12 +510,13 @@ impl SharedSerialManager { reason: &str, serial: Option, ) -> bool { - let Some(tx) = self.broadcasters.get(old_port) else { + let session_key = self.resolve_port_key(old_port); + let Some(tx) = self.broadcasters.get(&session_key) else { return false; }; let sent_renumbered = tx .send(SerialStreamEvent::PortRenumbered { - port: old_port.to_string(), + port: session_key, new_port: new_port.to_string(), reason: reason.to_string(), serial, @@ -515,25 +531,147 @@ impl SharedSerialManager { sent_renumbered || sent_reattached } + pub fn notify_port_rebind_failed( + &self, + old_port: &str, + new_port: &str, + reason: &str, + message: String, + ) -> bool { + let session_key = self.resolve_port_key(old_port); + let Some(tx) = self.broadcasters.get(&session_key) else { + return false; + }; + tx.send(SerialStreamEvent::PortRebindFailed { + port: session_key, + new_port: new_port.to_string(), + reason: reason.to_string(), + message, + }) + .is_ok() + } + + /// Reopen the physical serial handle on `new_port` while preserving the + /// existing logical session keyed by `old_port`. + pub async fn rebind_port_session( + &self, + old_port: &str, + new_port: &str, + reason: &str, + serial: Option, + ) -> fbuild_core::Result { + let session_key = self.resolve_port_key(old_port); + if !self.sessions.contains_key(&session_key) + || !self.broadcasters.contains_key(&session_key) + { + return Ok(false); + } + + let baud_rate = self + .sessions + .get(&session_key) + .map(|session| session.baud_rate) + .ok_or_else(|| { + fbuild_core::FbuildError::SerialError(format!( + "serial session {} disappeared during rebind", + session_key + )) + })?; + let max_retries: usize = if cfg!(windows) { 8 } else { 6 }; + let serial_handle = Arc::new(Mutex::new( + Self::open_physical_serial(new_port, baud_rate, max_retries).await?, + )); + + self.rebind_port_session_to_handle(&session_key, new_port, serial_handle, reason, serial) + .await + } + + async fn rebind_port_session_to_handle( + &self, + session_key: &str, + new_port: &str, + serial_handle: Arc>>, + reason: &str, + serial: Option, + ) -> fbuild_core::Result { + let Some(tx) = self.broadcasters.get(session_key).map(|tx| tx.clone()) else { + return Ok(false); + }; + let port_buf = self + .output_buffers + .entry(session_key.to_string()) + .or_insert_with(|| { + Arc::new(PortOutputBuffer { + buffer: std::sync::Mutex::new(VecDeque::with_capacity(OUTPUT_BUFFER_CAP)), + total_bytes_read: std::sync::atomic::AtomicU64::new(0), + }) + }) + .clone(); + + let old_reader = if let Some(mut session) = self.sessions.get_mut(session_key) { + session.stop_flag.store(true, Ordering::Relaxed); + session.reader_handle.take() + } else { + return Ok(false); + }; + if let Some(handle) = old_reader { + let _ = handle.await; + } + + let stop_flag = Arc::new(AtomicBool::new(false)); + let reader_handle = Self::spawn_reader( + session_key.to_string(), + Arc::clone(&serial_handle), + Arc::clone(&stop_flag), + tx.clone(), + port_buf, + ); + + if let Some(mut session) = self.sessions.get_mut(session_key) { + session.port = new_port.to_string(); + session.is_open = true; + session.serial_handle = Some(serial_handle); + session.reader_handle = Some(reader_handle); + session.stop_flag = stop_flag; + } + self.port_aliases + .insert(new_port.to_string(), session_key.to_string()); + self.bump_close_generation(session_key); + let _ = tx.send(SerialStreamEvent::PortRenumbered { + port: session_key.to_string(), + new_port: new_port.to_string(), + reason: reason.to_string(), + serial, + }); + let _ = tx.send(SerialStreamEvent::PortReattached { + port: new_port.to_string(), + previous_port: session_key.to_string(), + }); + Ok(true) + } + /// Returns the number of attached readers for a port (0 if not open). pub fn reader_count(&self, port: &str) -> usize { + let session_key = self.resolve_port_key(port); self.sessions - .get(port) + .get(&session_key) .map(|s| s.reader_client_ids.len()) .unwrap_or(0) } /// Returns true if a port has any active reader or writer client. pub fn has_clients(&self, port: &str) -> bool { + let session_key = self.resolve_port_key(port); self.sessions - .get(port) + .get(&session_key) .map(|s| !s.reader_client_ids.is_empty() || s.writer_client_id.is_some()) .unwrap_or(false) } /// Acquire exclusive write access to a port. pub async fn acquire_writer(&self, port: &str, client_id: &str) -> fbuild_core::Result<()> { - if let Some(mut session) = self.sessions.get_mut(port) { + let session_key = self.resolve_port_key(port); + if let Some(mut session) = self.sessions.get_mut(&session_key) { if session.writer_client_id.is_some() { return Err(fbuild_core::FbuildError::SerialError(format!( "port {} already has an exclusive writer", @@ -542,7 +680,7 @@ impl SharedSerialManager { } session.writer_client_id = Some(client_id.to_string()); drop(session); - self.bump_close_generation(port); + self.bump_close_generation(&session_key); Ok(()) } else { Err(fbuild_core::FbuildError::SerialError(format!( @@ -554,7 +692,8 @@ impl SharedSerialManager { /// Release write access. pub fn release_writer(&self, port: &str, client_id: &str) { - if let Some(mut session) = self.sessions.get_mut(port) { + let session_key = self.resolve_port_key(port); + if let Some(mut session) = self.sessions.get_mut(&session_key) { if session.writer_client_id.as_deref() == Some(client_id) { session.writer_client_id = None; } @@ -568,19 +707,24 @@ impl SharedSerialManager { reason: String, preempted_by: String, ) -> fbuild_core::Result<()> { - self.preemption.preempt(port, reason, preempted_by).await; + let session_key = self.resolve_port_key(port); + self.preemption + .preempt(&session_key, reason, preempted_by) + .await; self.close_port(port, "deploy_preemption").await?; Ok(()) } /// Clear preemption after deploy completes. pub async fn clear_preemption(&self, port: &str) { - self.preemption.clear(port).await; + let session_key = self.resolve_port_key(port); + self.preemption.clear(&session_key).await; } /// Check if a port is preempted. pub async fn is_preempted(&self, port: &str) -> bool { - self.preemption.is_preempted(port).await + let session_key = self.resolve_port_key(port); + self.preemption.is_preempted(&session_key).await } /// Get the preemption tracker for external use. @@ -590,20 +734,23 @@ impl SharedSerialManager { /// Attach a crash decoder to a port for decoding crash stack traces. pub fn set_crash_decoder(&self, port: &str, decoder: CrashDecoder) { - self.crash_decoders.insert(port.to_string(), decoder); + let session_key = self.resolve_port_key(port); + self.crash_decoders.insert(session_key, decoder); } /// Remove crash decoder from a port. pub fn remove_crash_decoder(&self, port: &str) { - self.crash_decoders.remove(port); + let session_key = self.resolve_port_key(port); + self.crash_decoders.remove(&session_key); } /// Process a serial line through the crash decoder for a port. /// /// Returns decoded crash trace lines if a crash dump just completed. pub fn process_crash_line(&self, port: &str, line: &str) -> Option> { + let session_key = self.resolve_port_key(port); self.crash_decoders - .get_mut(port) + .get_mut(&session_key) .and_then(|mut decoder| decoder.process_line(line)) } @@ -625,6 +772,144 @@ impl SharedSerialManager { .collect() } + async fn open_physical_serial( + port: &str, + baud_rate: u32, + max_retries: usize, + ) -> fbuild_core::Result> { + let backoff_schedule = [250u64, 500, 1000, 2000, 3000]; + let mut last_err = String::new(); + + for attempt in 0..max_retries { + let port_for_open = port.to_string(); + let open_result: std::result::Result< + std::result::Result, serialport::Error>, + tokio::task::JoinError, + > = tokio::task::spawn_blocking(move || { + let mut serial = serialport::new(&port_for_open, baud_rate) + .timeout(Duration::from_millis(100)) + .open()?; + match serial.write_data_terminal_ready(true) { + Ok(()) => tracing::debug!("manager: open-time DTR=high asserted"), + Err(e) => tracing::warn!("failed to set DTR: {}", e), + } + match serial.write_request_to_send(true) { + Ok(()) => tracing::debug!("manager: open-time RTS=high asserted"), + Err(e) => tracing::warn!("failed to set RTS: {}", e), + } + Ok(serial) + }) + .await; + + match open_result { + Ok(Ok(serial)) => return Ok(serial), + Ok(Err(e)) => last_err = e.to_string(), + Err(join_err) => last_err = format!("open task panicked: {}", join_err), + } + + let backoff_idx = attempt.min(backoff_schedule.len() - 1); + let backoff_ms = backoff_schedule[backoff_idx]; + tracing::debug!( + port, + attempt, + backoff_ms, + "open failed: {}, retrying", + last_err + ); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + } + + Err(fbuild_core::FbuildError::SerialError(format!( + "failed to open {} after {} attempts: {}", + port, max_retries, last_err + ))) + } + + fn spawn_reader( + event_port: String, + serial_handle: Arc>>, + stop_flag: Arc, + tx: broadcast::Sender, + port_buf: Arc, + ) -> tokio::task::JoinHandle<()> { + tokio::task::spawn_blocking(move || { + let mut buf = [0u8; READ_BUF_SIZE]; + let mut partial_line = String::new(); + + while !stop_flag.load(Ordering::Relaxed) { + let read_result = { + let mut serial = serial_handle.blocking_lock(); + serial.read(&mut buf) + }; + + match read_result { + Ok(n) if n > 0 => { + let text = String::from_utf8_lossy(&buf[..n]); + partial_line.push_str(&text); + port_buf + .total_bytes_read + .fetch_add(n as u64, Ordering::Relaxed); + + while let Some(newline_pos) = partial_line.find('\n') { + let line = partial_line[..newline_pos].trim_end().to_string(); + partial_line = partial_line[newline_pos + 1..].to_string(); + + if line.is_empty() { + continue; + } + + let _ = tx.send(SerialStreamEvent::Data(line.clone())); + if let Ok(mut ob) = port_buf.buffer.lock() { + if ob.len() >= OUTPUT_BUFFER_CAP { + ob.pop_front(); + } + ob.push_back(line); + } + } + } + Ok(_) => { + std::thread::sleep(Duration::from_millis(10)); + } + Err(ref e) + if e.kind() == std::io::ErrorKind::TimedOut + || e.kind() == std::io::ErrorKind::WouldBlock => + { + std::thread::sleep(Duration::from_millis(10)); + } + Err(e) => { + let message = e.to_string(); + tracing::error!(port = event_port, "serial read error: {}", message); + let _ = tx.send(SerialStreamEvent::PortDisconnected { + port: event_port.clone(), + reason: "read_error".to_string(), + message, + }); + break; + } + } + } + tracing::info!(port = event_port, "background reader stopped"); + }) + } + + fn resolve_port_key(&self, port: &str) -> String { + self.port_aliases + .get(port) + .map(|alias| alias.clone()) + .unwrap_or_else(|| port.to_string()) + } + + fn remove_aliases_for_session(&self, session_key: &str) { + let aliases: Vec = self + .port_aliases + .iter() + .filter_map(|entry| (entry.value() == session_key).then(|| entry.key().clone())) + .collect(); + for alias in aliases { + self.port_aliases.remove(&alias); + } + } + fn bump_close_generation(&self, port: &str) -> u64 { let mut generation = self.close_generations.entry(port.to_string()).or_insert(0); *generation += 1; @@ -658,6 +943,122 @@ impl Default for SharedSerialManager { #[cfg(test)] mod tests { use super::*; + use serialport::{ClearBuffer, DataBits, FlowControl, Parity, StopBits}; + use std::io::{Read, Write}; + + #[derive(Clone)] + struct FakeSerialPort { + name: String, + writes: Arc>>, + } + + impl FakeSerialPort { + fn new(name: &str) -> (Self, Arc>>) { + let writes = Arc::new(std::sync::Mutex::new(Vec::new())); + ( + Self { + name: name.to_string(), + writes: Arc::clone(&writes), + }, + writes, + ) + } + } + + impl Read for FakeSerialPort { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "no data")) + } + } + + impl Write for FakeSerialPort { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.writes.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + impl serialport::SerialPort for FakeSerialPort { + fn name(&self) -> Option { + Some(self.name.clone()) + } + fn baud_rate(&self) -> serialport::Result { + Ok(115200) + } + fn data_bits(&self) -> serialport::Result { + Ok(DataBits::Eight) + } + fn flow_control(&self) -> serialport::Result { + Ok(FlowControl::None) + } + fn parity(&self) -> serialport::Result { + Ok(Parity::None) + } + fn stop_bits(&self) -> serialport::Result { + Ok(StopBits::One) + } + fn timeout(&self) -> Duration { + Duration::from_millis(100) + } + fn set_baud_rate(&mut self, _baud_rate: u32) -> serialport::Result<()> { + Ok(()) + } + fn set_data_bits(&mut self, _data_bits: DataBits) -> serialport::Result<()> { + Ok(()) + } + fn set_flow_control(&mut self, _flow_control: FlowControl) -> serialport::Result<()> { + Ok(()) + } + fn set_parity(&mut self, _parity: Parity) -> serialport::Result<()> { + Ok(()) + } + fn set_stop_bits(&mut self, _stop_bits: StopBits) -> serialport::Result<()> { + Ok(()) + } + fn set_timeout(&mut self, _timeout: Duration) -> serialport::Result<()> { + Ok(()) + } + fn write_request_to_send(&mut self, _level: bool) -> serialport::Result<()> { + Ok(()) + } + fn write_data_terminal_ready(&mut self, _level: bool) -> serialport::Result<()> { + Ok(()) + } + fn read_clear_to_send(&mut self) -> serialport::Result { + Ok(true) + } + fn read_data_set_ready(&mut self) -> serialport::Result { + Ok(true) + } + fn read_ring_indicator(&mut self) -> serialport::Result { + Ok(false) + } + fn read_carrier_detect(&mut self) -> serialport::Result { + Ok(true) + } + fn bytes_to_read(&self) -> serialport::Result { + Ok(0) + } + fn bytes_to_write(&self) -> serialport::Result { + Ok(0) + } + fn clear(&self, _buffer_to_clear: ClearBuffer) -> serialport::Result<()> { + Ok(()) + } + fn try_clone(&self) -> serialport::Result> { + Ok(Box::new(self.clone())) + } + fn set_break(&self) -> serialport::Result<()> { + Ok(()) + } + fn clear_break(&self) -> serialport::Result<()> { + Ok(()) + } + } /// TDD red→green for ISSUES.md "Issue C": calling `open_port` against a /// definitely-nonexistent port must NOT block other tokio tasks on the @@ -957,4 +1358,93 @@ mod tests { } ); } + + #[tokio::test] + async fn rebind_preserves_session_and_routes_writes_to_new_handle() { + let mgr = SharedSerialManager::new(); + let old_port = "COM21"; + let new_port = "COM20"; + let writer = "writer-client"; + let reader = "reader-client"; + let (tx, mut rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE); + mgr.broadcasters.insert(old_port.to_string(), tx); + mgr.output_buffers.insert( + old_port.to_string(), + Arc::new(PortOutputBuffer { + buffer: std::sync::Mutex::new(VecDeque::with_capacity(OUTPUT_BUFFER_CAP)), + total_bytes_read: std::sync::atomic::AtomicU64::new(0), + }), + ); + let (old_fake, _old_writes) = FakeSerialPort::new(old_port); + let mut readers = std::collections::HashSet::new(); + readers.insert(reader.to_string()); + mgr.sessions.insert( + old_port.to_string(), + SerialSession { + port: old_port.to_string(), + baud_rate: 115200, + is_open: true, + writer_client_id: Some(writer.to_string()), + reader_client_ids: readers, + output_buffer: Default::default(), + total_bytes_read: 0, + total_bytes_written: 0, + started_at: 0.0, + owner_client_id: Some(writer.to_string()), + elf_path: None, + serial_handle: Some(Arc::new(Mutex::new(Box::new(old_fake)))), + reader_handle: None, + stop_flag: Arc::new(AtomicBool::new(false)), + }, + ); + let (new_fake, new_writes) = FakeSerialPort::new(new_port); + + assert!(mgr + .rebind_port_session_to_handle( + old_port, + new_port, + Arc::new(Mutex::new(Box::new(new_fake))), + "tracked_serial_move", + Some("15821020".to_string()), + ) + .await + .unwrap()); + + let session = mgr.sessions.get(old_port).expect("logical session remains"); + assert_eq!(session.port, new_port); + assert_eq!(session.writer_client_id.as_deref(), Some(writer)); + assert!(session.reader_client_ids.contains(reader)); + drop(session); + assert_eq!(mgr.reader_count(new_port), 1); + assert!(mgr.has_clients(new_port)); + + mgr.write_to_port(old_port, b"old-logical", writer) + .await + .unwrap(); + mgr.write_to_port(new_port, b"new-alias", writer) + .await + .unwrap(); + assert_eq!(&*new_writes.lock().unwrap(), b"old-logicalnew-alias"); + + assert_eq!( + rx.try_recv().unwrap(), + SerialStreamEvent::PortRenumbered { + port: old_port.to_string(), + new_port: new_port.to_string(), + reason: "tracked_serial_move".to_string(), + serial: Some("15821020".to_string()), + } + ); + assert_eq!( + rx.try_recv().unwrap(), + SerialStreamEvent::PortReattached { + port: new_port.to_string(), + previous_port: old_port.to_string(), + } + ); + + mgr.close_port(new_port, "test").await.unwrap(); + assert!(mgr.sessions.get(old_port).is_none()); + assert!(mgr.port_aliases.get(new_port).is_none()); + } } diff --git a/crates/fbuild-serial/src/messages.rs b/crates/fbuild-serial/src/messages.rs index 279493fe..1ee48438 100644 --- a/crates/fbuild-serial/src/messages.rs +++ b/crates/fbuild-serial/src/messages.rs @@ -70,6 +70,12 @@ pub enum SerialServerMessage { port: String, previous_port: String, }, + PortRebindFailed { + port: String, + new_port: String, + reason: String, + message: String, + }, WriteAck { success: bool, bytes_written: usize, @@ -106,6 +112,12 @@ pub enum SerialStreamEvent { port: String, previous_port: String, }, + PortRebindFailed { + port: String, + new_port: String, + reason: String, + message: String, + }, } #[cfg(test)] @@ -315,6 +327,33 @@ mod tests { } } + #[test] + fn server_port_rebind_failed_roundtrip() { + let msg = SerialServerMessage::PortRebindFailed { + port: "COM21".into(), + new_port: "COM20".into(), + reason: "open_failed".into(), + message: "access denied".into(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"port_rebind_failed\"")); + let parsed: SerialServerMessage = serde_json::from_str(&json).unwrap(); + match parsed { + SerialServerMessage::PortRebindFailed { + port, + new_port, + reason, + message, + } => { + assert_eq!(port, "COM21"); + assert_eq!(new_port, "COM20"); + assert_eq!(reason, "open_failed"); + assert_eq!(message, "access denied"); + } + _ => panic!("expected PortRebindFailed"), + } + } + #[test] fn server_write_ack_without_message() { let msg = SerialServerMessage::WriteAck {