diff --git a/crates/fbuild-daemon/src/context.rs b/crates/fbuild-daemon/src/context.rs index 81b74665..06b312fc 100644 --- a/crates/fbuild-daemon/src/context.rs +++ b/crates/fbuild-daemon/src/context.rs @@ -377,6 +377,30 @@ impl DaemonContext { .or_insert_with(|| Arc::new(Mutex::new(()))) .clone() } + + pub fn refresh_devices_and_broadcast_serial_moves(&self) { + self.device_manager.refresh_devices(); + self.broadcast_recent_device_port_moves(); + } + + pub fn refresh_devices_if_stale_and_broadcast_serial_moves(&self, max_age: Duration) -> bool { + let refreshed = self.device_manager.refresh_devices_if_stale(max_age); + if refreshed { + self.broadcast_recent_device_port_moves(); + } + refreshed + } + + fn broadcast_recent_device_port_moves(&self) { + for move_event in self.device_manager.take_recent_port_moves() { + self.serial_manager.notify_port_renumbered( + &move_event.previous_port, + &move_event.port, + "tracked_serial_move", + move_event.serial_number, + ); + } + } } fn now_unix() -> f64 { diff --git a/crates/fbuild-daemon/src/device_manager.rs b/crates/fbuild-daemon/src/device_manager.rs index 9e8dd8f3..3ecb9fbc 100644 --- a/crates/fbuild-daemon/src/device_manager.rs +++ b/crates/fbuild-daemon/src/device_manager.rs @@ -118,6 +118,13 @@ pub struct DeviceState { pub last_disconnect_at: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DevicePortMove { + pub previous_port: String, + pub port: String, + pub serial_number: Option, +} + impl DeviceState { pub fn is_available_for_exclusive(&self) -> bool { self.exclusive_lease.is_none() @@ -155,6 +162,7 @@ pub struct DeviceManager { /// enumeration cache is still fresh — the dominant cost on /// back-to-back warm deploys. last_refresh_at: Mutex>, + recent_port_moves: Mutex>, } impl Default for DeviceManager { @@ -168,6 +176,7 @@ impl DeviceManager { Self { devices: Mutex::new(HashMap::new()), last_refresh_at: Mutex::new(None), + recent_port_moves: Mutex::new(Vec::new()), } } @@ -279,6 +288,7 @@ impl DeviceManager { }); if let Some(old_port) = moved_from { if let Some(mut state) = devices.remove(&old_port) { + let serial_number = device.serial_number.clone(); state.previous_port = Some(old_port); state.port = key.clone(); state.is_connected = true; @@ -288,6 +298,13 @@ impl DeviceManager { state.vid = device.vid; state.pid = device.pid; state.serial_number = device.serial_number; + if let Some(previous_port) = state.previous_port.clone() { + self.recent_port_moves.lock().unwrap().push(DevicePortMove { + previous_port, + port: key.clone(), + serial_number, + }); + } devices.insert(key, state); continue; } @@ -341,6 +358,11 @@ impl DeviceManager { self.devices.lock().unwrap().clone() } + pub fn take_recent_port_moves(&self) -> Vec { + let mut moves = self.recent_port_moves.lock().unwrap(); + std::mem::take(&mut *moves) + } + /// Get status for a specific device (by port name). pub fn get_device_status(&self, port: &str) -> Option { self.devices.lock().unwrap().get(port).cloned() @@ -829,6 +851,18 @@ mod tests { moved.exclusive_lease.as_ref().unwrap().track_serial, "moved lease must retain track_serial" ); + assert_eq!( + mgr.take_recent_port_moves(), + vec![DevicePortMove { + previous_port: "COM3".to_string(), + port: "COM4".to_string(), + serial_number: Some("TEST-SERIAL".to_string()), + }] + ); + assert!( + mgr.take_recent_port_moves().is_empty(), + "taking recent moves should drain the queue" + ); } #[test] diff --git a/crates/fbuild-daemon/src/handlers/devices.rs b/crates/fbuild-daemon/src/handlers/devices.rs index d5fe4de2..308c81a8 100644 --- a/crates/fbuild-daemon/src/handlers/devices.rs +++ b/crates/fbuild-daemon/src/handlers/devices.rs @@ -15,7 +15,7 @@ use uuid::Uuid; /// POST /api/devices/list pub async fn list_devices(state: State>) -> Json { // Refresh device inventory - state.device_manager.refresh_devices(); + state.refresh_devices_and_broadcast_serial_moves(); let all = state.device_manager.get_all_devices(); let devices = all.values().map(device_info).collect(); @@ -32,7 +32,7 @@ pub async fn device_status( Path(port): Path, ) -> Json { // Refresh to get latest state - state.device_manager.refresh_devices(); + state.refresh_devices_and_broadcast_serial_moves(); match state.device_manager.get_device_status(&port) { Some(ds) => Json(device_status_response(ds)), @@ -99,7 +99,7 @@ pub async fn device_lease( let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string()); // Ensure devices are refreshed - state.device_manager.refresh_devices(); + state.refresh_devices_and_broadcast_serial_moves(); let result = match req.lease_type.as_str() { "exclusive" => state.device_manager.acquire_exclusive( @@ -178,7 +178,7 @@ pub async fn device_preempt( let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string()); // Refresh first - state.device_manager.refresh_devices(); + state.refresh_devices_and_broadcast_serial_moves(); match state .device_manager diff --git a/crates/fbuild-daemon/src/handlers/operations/deploy.rs b/crates/fbuild-daemon/src/handlers/operations/deploy.rs index ed560fe9..6fbc5c90 100644 --- a/crates/fbuild-daemon/src/handlers/operations/deploy.rs +++ b/crates/fbuild-daemon/src/handlers/operations/deploy.rs @@ -364,7 +364,7 @@ pub async fn deploy( } let deploy_port_choice = if req.port.is_none() { - ctx.device_manager.refresh_devices(); + ctx.refresh_devices_and_broadcast_serial_moves(); choose_deploy_port( None, platform, @@ -412,8 +412,7 @@ pub async fn deploy( // the trust-check still requires `is_connected == true` on the // cached DeviceState, which the most-recent refresh supplied. if trusted_hash_enabled { - ctx.device_manager - .refresh_devices_if_stale(std::time::Duration::from_secs(2)); + ctx.refresh_devices_if_stale_and_broadcast_serial_moves(std::time::Duration::from_secs(2)); } let deploy_result = tokio::task::spawn_blocking(move || -> fbuild_core::Result<(Option>, fbuild_deploy::DeploymentResult)> { // Populated by the Espressif32 arm with (image_hash, port). diff --git a/crates/fbuild-daemon/src/handlers/operations/monitor.rs b/crates/fbuild-daemon/src/handlers/operations/monitor.rs index 02946d54..3fdef8a4 100644 --- a/crates/fbuild-daemon/src/handlers/operations/monitor.rs +++ b/crates/fbuild-daemon/src/handlers/operations/monitor.rs @@ -201,6 +201,8 @@ pub(crate) async fn run_monitor_loop( "serial port {port} disconnected ({reason}): {message}" )); } + Ok(Ok(SerialStreamEvent::PortRenumbered { .. })) + | Ok(Ok(SerialStreamEvent::PortReattached { .. })) => {} Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => { tracing::warn!("monitor lagged, skipped {} messages", n); } diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index fb71db3a..1289641e 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -211,8 +211,29 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { reason, message, }; - let _ = socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await; - break; + if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { + break; + } + } + Ok(SerialStreamEvent::PortRenumbered { port, new_port, reason, serial }) => { + let msg = SerialServerMessage::PortRenumbered { + port, + new_port, + reason, + serial, + }; + if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { + break; + } + } + Ok(SerialStreamEvent::PortReattached { port, previous_port }) => { + let msg = SerialServerMessage::PortReattached { + port, + previous_port, + }; + if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { + break; + } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(client_id, port, n, "reader lagged, skipping lines"); diff --git a/crates/fbuild-python/src/json_rpc.rs b/crates/fbuild-python/src/json_rpc.rs index 72fef863..28a65e18 100644 --- a/crates/fbuild-python/src/json_rpc.rs +++ b/crates/fbuild-python/src/json_rpc.rs @@ -80,6 +80,8 @@ pub(crate) async fn read_lines_async( break; } Ok(ServerMessage::Reconnected { .. }) => continue, + Ok(ServerMessage::PortRenumbered { .. }) + | Ok(ServerMessage::PortReattached { .. }) => continue, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } diff --git a/crates/fbuild-python/src/messages.rs b/crates/fbuild-python/src/messages.rs index bf2b5290..fbceba4e 100644 --- a/crates/fbuild-python/src/messages.rs +++ b/crates/fbuild-python/src/messages.rs @@ -56,6 +56,22 @@ pub(crate) enum ServerMessage { #[allow(dead_code)] message: String, }, + PortRenumbered { + #[allow(dead_code)] + port: String, + #[allow(dead_code)] + new_port: String, + #[allow(dead_code)] + reason: String, + #[allow(dead_code)] + serial: Option, + }, + PortReattached { + #[allow(dead_code)] + port: String, + #[allow(dead_code)] + previous_port: String, + }, Error { message: String, }, diff --git a/crates/fbuild-python/src/serial_monitor.rs b/crates/fbuild-python/src/serial_monitor.rs index fe6f40e4..5391bf8a 100644 --- a/crates/fbuild-python/src/serial_monitor.rs +++ b/crates/fbuild-python/src/serial_monitor.rs @@ -213,6 +213,8 @@ impl SerialMonitor { // Resume after deploy continue; } + Ok(ServerMessage::PortRenumbered { .. }) + | Ok(ServerMessage::PortReattached { .. }) => continue, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } @@ -555,6 +557,8 @@ impl SerialMonitor { Ok(ServerMessage::Reconnected { .. }) => { continue; } + Ok(ServerMessage::PortRenumbered { .. }) + | Ok(ServerMessage::PortReattached { .. }) => continue, Ok(ServerMessage::PortDisconnected { .. }) => break, _ => continue, } diff --git a/crates/fbuild-serial/src/manager.rs b/crates/fbuild-serial/src/manager.rs index b35c8fae..8afc29ea 100644 --- a/crates/fbuild-serial/src/manager.rs +++ b/crates/fbuild-serial/src/manager.rs @@ -486,6 +486,35 @@ impl SharedSerialManager { } } + /// Notify subscribers on the old port that a tracked USB serial moved to a + /// new OS port and is available again there. + pub fn notify_port_renumbered( + &self, + old_port: &str, + new_port: &str, + reason: &str, + serial: Option, + ) -> bool { + let Some(tx) = self.broadcasters.get(old_port) else { + return false; + }; + let sent_renumbered = tx + .send(SerialStreamEvent::PortRenumbered { + port: old_port.to_string(), + new_port: new_port.to_string(), + reason: reason.to_string(), + serial, + }) + .is_ok(); + let sent_reattached = tx + .send(SerialStreamEvent::PortReattached { + port: new_port.to_string(), + previous_port: old_port.to_string(), + }) + .is_ok(); + sent_renumbered || sent_reattached + } + /// Returns the number of attached readers for a port (0 if not open). pub fn reader_count(&self, port: &str) -> usize { self.sessions @@ -895,4 +924,37 @@ mod tests { ); assert!(mgr.has_clients(port)); } + + #[test] + fn notify_port_renumbered_broadcasts_events_to_old_port() { + let mgr = SharedSerialManager::new(); + let old_port = "COM21"; + let new_port = "COM20"; + let (tx, mut rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE); + mgr.broadcasters.insert(old_port.to_string(), tx); + + assert!(mgr.notify_port_renumbered( + old_port, + new_port, + "tracked_serial_move", + Some("15821020".to_string()) + )); + + assert_eq!( + rx.try_recv().unwrap(), + SerialStreamEvent::PortRenumbered { + port: old_port.to_string(), + new_port: new_port.to_string(), + reason: "tracked_serial_move".to_string(), + serial: Some("15821020".to_string()), + } + ); + assert_eq!( + rx.try_recv().unwrap(), + SerialStreamEvent::PortReattached { + port: new_port.to_string(), + previous_port: old_port.to_string(), + } + ); + } } diff --git a/crates/fbuild-serial/src/messages.rs b/crates/fbuild-serial/src/messages.rs index 440b0d26..279493fe 100644 --- a/crates/fbuild-serial/src/messages.rs +++ b/crates/fbuild-serial/src/messages.rs @@ -59,6 +59,17 @@ pub enum SerialServerMessage { reason: String, message: String, }, + PortRenumbered { + port: String, + new_port: String, + reason: String, + #[serde(skip_serializing_if = "Option::is_none")] + serial: Option, + }, + PortReattached { + port: String, + previous_port: String, + }, WriteAck { success: bool, bytes_written: usize, @@ -85,6 +96,16 @@ pub enum SerialStreamEvent { reason: String, message: String, }, + PortRenumbered { + port: String, + new_port: String, + reason: String, + serial: Option, + }, + PortReattached { + port: String, + previous_port: String, + }, } #[cfg(test)] @@ -246,6 +267,54 @@ mod tests { } } + #[test] + fn server_port_renumbered_roundtrip() { + let msg = SerialServerMessage::PortRenumbered { + port: "COM21".into(), + new_port: "COM20".into(), + reason: "tracked_serial_move".into(), + serial: Some("15821020".into()), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"port_renumbered\"")); + let parsed: SerialServerMessage = serde_json::from_str(&json).unwrap(); + match parsed { + SerialServerMessage::PortRenumbered { + port, + new_port, + reason, + serial, + } => { + assert_eq!(port, "COM21"); + assert_eq!(new_port, "COM20"); + assert_eq!(reason, "tracked_serial_move"); + assert_eq!(serial.as_deref(), Some("15821020")); + } + _ => panic!("expected PortRenumbered"), + } + } + + #[test] + fn server_port_reattached_roundtrip() { + let msg = SerialServerMessage::PortReattached { + port: "COM20".into(), + previous_port: "COM21".into(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"port_reattached\"")); + let parsed: SerialServerMessage = serde_json::from_str(&json).unwrap(); + match parsed { + SerialServerMessage::PortReattached { + port, + previous_port, + } => { + assert_eq!(port, "COM20"); + assert_eq!(previous_port, "COM21"); + } + _ => panic!("expected PortReattached"), + } + } + #[test] fn server_write_ack_without_message() { let msg = SerialServerMessage::WriteAck {