Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions crates/fbuild-daemon/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,30 @@ impl DaemonContext {
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}

pub fn refresh_devices_and_broadcast_serial_moves(&self) {
self.device_manager.refresh_devices();
self.broadcast_recent_device_port_moves();
}

pub fn refresh_devices_if_stale_and_broadcast_serial_moves(&self, max_age: Duration) -> bool {
let refreshed = self.device_manager.refresh_devices_if_stale(max_age);
if refreshed {
self.broadcast_recent_device_port_moves();
}
refreshed
}

fn broadcast_recent_device_port_moves(&self) {
for move_event in self.device_manager.take_recent_port_moves() {
self.serial_manager.notify_port_renumbered(
&move_event.previous_port,
&move_event.port,
"tracked_serial_move",
move_event.serial_number,
);
}
}
}

fn now_unix() -> f64 {
Expand Down
34 changes: 34 additions & 0 deletions crates/fbuild-daemon/src/device_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ pub struct DeviceState {
pub last_disconnect_at: Option<Instant>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DevicePortMove {
pub previous_port: String,
pub port: String,
pub serial_number: Option<String>,
}

impl DeviceState {
pub fn is_available_for_exclusive(&self) -> bool {
self.exclusive_lease.is_none()
Expand Down Expand Up @@ -155,6 +162,7 @@ pub struct DeviceManager {
/// enumeration cache is still fresh — the dominant cost on
/// back-to-back warm deploys.
last_refresh_at: Mutex<Option<Instant>>,
recent_port_moves: Mutex<Vec<DevicePortMove>>,
}

impl Default for DeviceManager {
Expand All @@ -168,6 +176,7 @@ impl DeviceManager {
Self {
devices: Mutex::new(HashMap::new()),
last_refresh_at: Mutex::new(None),
recent_port_moves: Mutex::new(Vec::new()),
}
}

Expand Down Expand Up @@ -279,6 +288,7 @@ impl DeviceManager {
});
if let Some(old_port) = moved_from {
if let Some(mut state) = devices.remove(&old_port) {
let serial_number = device.serial_number.clone();
state.previous_port = Some(old_port);
state.port = key.clone();
state.is_connected = true;
Expand All @@ -288,6 +298,13 @@ impl DeviceManager {
state.vid = device.vid;
state.pid = device.pid;
state.serial_number = device.serial_number;
if let Some(previous_port) = state.previous_port.clone() {
self.recent_port_moves.lock().unwrap().push(DevicePortMove {
previous_port,
port: key.clone(),
serial_number,
});
}
devices.insert(key, state);
continue;
}
Expand Down Expand Up @@ -341,6 +358,11 @@ impl DeviceManager {
self.devices.lock().unwrap().clone()
}

pub fn take_recent_port_moves(&self) -> Vec<DevicePortMove> {
let mut moves = self.recent_port_moves.lock().unwrap();
std::mem::take(&mut *moves)
}

/// Get status for a specific device (by port name).
pub fn get_device_status(&self, port: &str) -> Option<DeviceState> {
self.devices.lock().unwrap().get(port).cloned()
Expand Down Expand Up @@ -829,6 +851,18 @@ mod tests {
moved.exclusive_lease.as_ref().unwrap().track_serial,
"moved lease must retain track_serial"
);
assert_eq!(
mgr.take_recent_port_moves(),
vec![DevicePortMove {
previous_port: "COM3".to_string(),
port: "COM4".to_string(),
serial_number: Some("TEST-SERIAL".to_string()),
}]
);
assert!(
mgr.take_recent_port_moves().is_empty(),
"taking recent moves should drain the queue"
);
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions crates/fbuild-daemon/src/handlers/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use uuid::Uuid;
/// POST /api/devices/list
pub async fn list_devices(state: State<Arc<DaemonContext>>) -> Json<DeviceListResponse> {
// Refresh device inventory
state.device_manager.refresh_devices();
state.refresh_devices_and_broadcast_serial_moves();

let all = state.device_manager.get_all_devices();
let devices = all.values().map(device_info).collect();
Expand All @@ -32,7 +32,7 @@ pub async fn device_status(
Path(port): Path<String>,
) -> Json<DeviceStatusResponse> {
// Refresh to get latest state
state.device_manager.refresh_devices();
state.refresh_devices_and_broadcast_serial_moves();

match state.device_manager.get_device_status(&port) {
Some(ds) => Json(device_status_response(ds)),
Expand Down Expand Up @@ -99,7 +99,7 @@ pub async fn device_lease(
let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string());

// Ensure devices are refreshed
state.device_manager.refresh_devices();
state.refresh_devices_and_broadcast_serial_moves();

let result = match req.lease_type.as_str() {
"exclusive" => state.device_manager.acquire_exclusive(
Expand Down Expand Up @@ -178,7 +178,7 @@ pub async fn device_preempt(
let client_id = req.client_id.unwrap_or_else(|| Uuid::new_v4().to_string());

// Refresh first
state.device_manager.refresh_devices();
state.refresh_devices_and_broadcast_serial_moves();

match state
.device_manager
Expand Down
5 changes: 2 additions & 3 deletions crates/fbuild-daemon/src/handlers/operations/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub async fn deploy(
}

let deploy_port_choice = if req.port.is_none() {
ctx.device_manager.refresh_devices();
ctx.refresh_devices_and_broadcast_serial_moves();
choose_deploy_port(
None,
platform,
Expand Down Expand Up @@ -412,8 +412,7 @@ pub async fn deploy(
// the trust-check still requires `is_connected == true` on the
// cached DeviceState, which the most-recent refresh supplied.
if trusted_hash_enabled {
ctx.device_manager
.refresh_devices_if_stale(std::time::Duration::from_secs(2));
ctx.refresh_devices_if_stale_and_broadcast_serial_moves(std::time::Duration::from_secs(2));
}
let deploy_result = tokio::task::spawn_blocking(move || -> fbuild_core::Result<(Option<Box<dyn fbuild_deploy::Deployer>>, fbuild_deploy::DeploymentResult)> {
// Populated by the Espressif32 arm with (image_hash, port).
Expand Down
2 changes: 2 additions & 0 deletions crates/fbuild-daemon/src/handlers/operations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ pub(crate) async fn run_monitor_loop(
"serial port {port} disconnected ({reason}): {message}"
));
}
Ok(Ok(SerialStreamEvent::PortRenumbered { .. }))
| Ok(Ok(SerialStreamEvent::PortReattached { .. })) => {}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
tracing::warn!("monitor lagged, skipped {} messages", n);
}
Expand Down
25 changes: 23 additions & 2 deletions crates/fbuild-daemon/src/handlers/websockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,29 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {
reason,
message,
};
let _ = socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await;
break;
if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() {
break;
}
}
Ok(SerialStreamEvent::PortRenumbered { port, new_port, reason, serial }) => {
let msg = SerialServerMessage::PortRenumbered {
port,
new_port,
reason,
serial,
};
if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() {
break;
}
}
Ok(SerialStreamEvent::PortReattached { port, previous_port }) => {
let msg = SerialServerMessage::PortReattached {
port,
previous_port,
};
if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(client_id, port, n, "reader lagged, skipping lines");
Expand Down
2 changes: 2 additions & 0 deletions crates/fbuild-python/src/json_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub(crate) async fn read_lines_async(
break;
}
Ok(ServerMessage::Reconnected { .. }) => continue,
Ok(ServerMessage::PortRenumbered { .. })
| Ok(ServerMessage::PortReattached { .. }) => continue,
Ok(ServerMessage::PortDisconnected { .. }) => break,
_ => continue,
}
Expand Down
16 changes: 16 additions & 0 deletions crates/fbuild-python/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ pub(crate) enum ServerMessage {
#[allow(dead_code)]
message: String,
},
PortRenumbered {
#[allow(dead_code)]
port: String,
#[allow(dead_code)]
new_port: String,
#[allow(dead_code)]
reason: String,
#[allow(dead_code)]
serial: Option<String>,
},
PortReattached {
#[allow(dead_code)]
port: String,
#[allow(dead_code)]
previous_port: String,
},
Error {
message: String,
},
Expand Down
4 changes: 4 additions & 0 deletions crates/fbuild-python/src/serial_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl SerialMonitor {
// Resume after deploy
continue;
}
Ok(ServerMessage::PortRenumbered { .. })
| Ok(ServerMessage::PortReattached { .. }) => continue,
Ok(ServerMessage::PortDisconnected { .. }) => break,
_ => continue,
}
Expand Down Expand Up @@ -555,6 +557,8 @@ impl SerialMonitor {
Ok(ServerMessage::Reconnected { .. }) => {
continue;
}
Ok(ServerMessage::PortRenumbered { .. })
| Ok(ServerMessage::PortReattached { .. }) => continue,
Ok(ServerMessage::PortDisconnected { .. }) => break,
_ => continue,
}
Expand Down
62 changes: 62 additions & 0 deletions crates/fbuild-serial/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,35 @@ impl SharedSerialManager {
}
}

/// Notify subscribers on the old port that a tracked USB serial moved to a
/// new OS port and is available again there.
pub fn notify_port_renumbered(
&self,
old_port: &str,
new_port: &str,
reason: &str,
serial: Option<String>,
) -> bool {
let Some(tx) = self.broadcasters.get(old_port) else {
return false;
};
let sent_renumbered = tx
.send(SerialStreamEvent::PortRenumbered {
port: old_port.to_string(),
new_port: new_port.to_string(),
reason: reason.to_string(),
serial,
})
.is_ok();
let sent_reattached = tx
.send(SerialStreamEvent::PortReattached {
port: new_port.to_string(),
previous_port: old_port.to_string(),
})
.is_ok();
sent_renumbered || sent_reattached
}

/// Returns the number of attached readers for a port (0 if not open).
pub fn reader_count(&self, port: &str) -> usize {
self.sessions
Expand Down Expand Up @@ -895,4 +924,37 @@ mod tests {
);
assert!(mgr.has_clients(port));
}

#[test]
fn notify_port_renumbered_broadcasts_events_to_old_port() {
let mgr = SharedSerialManager::new();
let old_port = "COM21";
let new_port = "COM20";
let (tx, mut rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
mgr.broadcasters.insert(old_port.to_string(), tx);

assert!(mgr.notify_port_renumbered(
old_port,
new_port,
"tracked_serial_move",
Some("15821020".to_string())
));

assert_eq!(
rx.try_recv().unwrap(),
SerialStreamEvent::PortRenumbered {
port: old_port.to_string(),
new_port: new_port.to_string(),
reason: "tracked_serial_move".to_string(),
serial: Some("15821020".to_string()),
}
);
assert_eq!(
rx.try_recv().unwrap(),
SerialStreamEvent::PortReattached {
port: new_port.to_string(),
previous_port: old_port.to_string(),
}
);
}
}
Loading
Loading