diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index a3deaca8..f521a0a2 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -7,8 +7,10 @@ use axum::extract::ws::{Message, WebSocket}; use axum::extract::{Path, State, WebSocketUpgrade}; use axum::response::IntoResponse; use fbuild_serial::{SerialClientMessage, SerialServerMessage, SerialStreamEvent}; +use futures::{SinkExt, StreamExt}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::mpsc; // --------------------------------------------------------------------------- // /ws/serial-monitor — existing serial monitor WebSocket @@ -170,181 +172,342 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { return; } - let mut line_index: u64 = 0; - - loop { - tokio::select! { - // Forward serial output to WebSocket - result = rx.recv() => { - match result { + // Concurrent reader / writer / inbound split (issue #749). + // + // The pre-#749 implementation handled serial-RX, WebSocket-TX, and + // WebSocket-RX in a single `tokio::select!` loop. That meant every + // WS frame had to be flushed to the OS socket BEFORE the next + // broadcast line could be consumed, which created head-of-line + // blocking: while `socket.send().await` was suspended the broadcast + // receiver couldn't drain, the broadcast channel filled to its + // 1024-entry cap, and `RecvError::Lagged` started silently dropping + // lines (FastLED #3219 root cause -- whole patterns vanished from + // the wrapper's view). + // + // The fix splits the work three ways via `WebSocket::split()`: + // + // READER (broadcast -> internal queue): pulls events from the + // serial broadcast channel as fast as the runtime allows. Never + // blocks on socket I/O. Pushes outbound messages into an + // unbounded mpsc queue. + // + // WRITER (internal queue -> WS sink): blocks on the first + // `recv().await`, then non-blockingly `try_recv()`s every + // additional message that arrived during the previous flush. + // Coalesces ADJACENT `Data` messages into one Data { lines: ..., + // current_index } so the WS frame count stays low under bursty + // traffic. Non-Data messages (port events, ACKs) are emitted + // 1:1 to preserve ordering and latency. + // + // INBOUND (WS stream -> serial manager): handles client commands + // (Write, Detach, ClearBuffer, GetInWaiting). Routes outbound + // replies through the same mpsc queue so the WRITER task is the + // sole owner of the WS sink. + // + // The reader is bounded only by broadcast throughput; the writer + // is bounded only by socket throughput. The queue absorbs the + // mismatch, which is exactly what the device-burst case needs. + // See FastLED/fbuild#749. + + let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); + let (mut ws_sink, mut ws_stream) = socket.split(); + + // READER task -- broadcast -> mpsc queue. + let reader_handle = { + let ctx = ctx.clone(); + let port_owned = port.clone(); + let client_id_owned = client_id.clone(); + let out_tx_reader = out_tx.clone(); + tokio::spawn(async move { + let mut line_index: u64 = 0; + loop { + match rx.recv().await { Ok(SerialStreamEvent::Data(line)) => { - line_index += 1; - // A streaming serial monitor is "active" — bump - // last_activity on every line so the 12-hour - // IDLE_TIMEOUT doesn't kill a long unattended - // autoresearch run. Self-eviction is already - // handled separately by `pending_serial_attaches` - // + open serial session count, but `idle_duration()` - // is independent and would otherwise tick toward - // the 12h fallback. See ISSUES.md outstanding - // "self-eviction grace period during attach" - // (Issue A follow-up). ctx.touch_activity(); - - // Process through crash decoder - let mut lines = vec![line.clone()]; - if let Some(decoded) = ctx.serial_manager.process_crash_line(&port, &line) { + line_index += 1; + let mut lines: Vec = Vec::with_capacity(2); + lines.push(line.clone()); + if let Some(decoded) = + ctx.serial_manager.process_crash_line(&port_owned, &line) + { lines.extend(decoded); } - - let data_msg = SerialServerMessage::Data { + let msg = SerialServerMessage::Data { lines, current_index: line_index, }; - if socket.send(Message::Text(serde_json::to_string(&data_msg).unwrap())).await.is_err() { - break; + if out_tx_reader.send(msg).is_err() { + break; // writer dropped its receiver -> session over } } - Ok(SerialStreamEvent::PortDisconnected { port, reason, message }) => { - let msg = SerialServerMessage::PortDisconnected { + Ok(SerialStreamEvent::PortDisconnected { + port, + reason, + message, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortDisconnected { port, reason, message, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortRenumbered { port, new_port, reason, serial }) => { - let msg = SerialServerMessage::PortRenumbered { + Ok(SerialStreamEvent::PortRenumbered { + port, + new_port, + reason, + serial, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortRenumbered { port, new_port, reason, serial, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortReattached { port, previous_port }) => { - let msg = SerialServerMessage::PortReattached { + Ok(SerialStreamEvent::PortReattached { + port, + previous_port, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortReattached { port, previous_port, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortRebindFailed { port, new_port, reason, message }) => { - let msg = SerialServerMessage::PortRebindFailed { + Ok(SerialStreamEvent::PortRebindFailed { + port, + new_port, + reason, + message, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortRebindFailed { port, new_port, reason, message, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(client_id, port, n, "reader lagged, skipping lines"); + // Still a warning -- means the WRITER->socket path + // could not keep up with serial fan-in. With the + // mpsc absorber between reader and writer this + // should be rare; if it fires the mpsc itself is + // back-pressuring (unbounded; this means we ran out + // of memory ahead of the socket, which is a deeper + // problem worth surfacing). + tracing::warn!( + client_id = %client_id_owned, + port = %port_owned, + n, + "reader lagged at broadcast layer, skipping lines" + ); } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - break; + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + // Dropping `out_tx_reader` here signals the writer that no more + // data events will arrive (but inbound's clone keeps the + // channel alive until inbound also exits). + }) + }; + + // WRITER task -- mpsc queue -> WS sink, coalescing adjacent Data. + let writer_handle = tokio::spawn(async move { + loop { + // Block until at least one message is available. As soon as + // `socket.send().await` returns from the PREVIOUS flush we + // re-enter here and will pick up whatever the reader pushed + // during that flush in the inner try_recv drain below. + let Some(first) = out_rx.recv().await else { + break; // all senders dropped + }; + + // Drain everything else that's already queued so we can pack + // a single big WS frame on burst. `try_recv` returns + // immediately when the queue is empty. + let mut pending: Vec = Vec::with_capacity(8); + pending.push(first); + while let Ok(more) = out_rx.try_recv() { + pending.push(more); + // Soft cap so a single send doesn't grow without bound + // when the writer is somehow far behind. Picked high + // enough that typical bursts pack into one frame. + if pending.len() >= 256 { + break; + } + } + + // Coalesce ADJACENT Data messages so the WS frame count is + // O(non-data events + bursts) instead of O(lines). Non-Data + // messages flush the current Data batch first to preserve + // arrival order. + let mut data_batch: Vec = Vec::new(); + let mut last_index: u64 = 0; + let mut send_failed = false; + + for msg in pending { + match msg { + SerialServerMessage::Data { + lines, + current_index, + } => { + data_batch.extend(lines); + last_index = current_index; + } + other => { + if !data_batch.is_empty() { + let coalesced = SerialServerMessage::Data { + lines: std::mem::take(&mut data_batch), + current_index: last_index, + }; + if ws_sink + .send(Message::Text(serde_json::to_string(&coalesced).unwrap())) + .await + .is_err() + { + send_failed = true; + break; + } + } + if ws_sink + .send(Message::Text(serde_json::to_string(&other).unwrap())) + .await + .is_err() + { + send_failed = true; + break; + } } } } - // Handle incoming WebSocket messages - msg = socket.recv() => { + + if !send_failed && !data_batch.is_empty() { + let coalesced = SerialServerMessage::Data { + lines: data_batch, + current_index: last_index, + }; + if ws_sink + .send(Message::Text(serde_json::to_string(&coalesced).unwrap())) + .await + .is_err() + { + send_failed = true; + } + } + + if send_failed { + break; + } + } + }); + + // INBOUND task -- WS stream -> serial manager + ack reply via mpsc. + let inbound_handle = { + let ctx = ctx.clone(); + let port_owned = port.clone(); + let client_id_owned = client_id.clone(); + let out_tx_inbound = out_tx; + tokio::spawn(async move { + while let Some(msg) = ws_stream.next().await { match msg { - Some(Ok(Message::Text(text))) => { + Ok(Message::Text(text)) => { match serde_json::from_str::(&text) { Ok(SerialClientMessage::Write { data }) => { - // Inbound writes also count as activity — - // resets idle timer so client-driven - // sessions (e.g. JSON-RPC over serial) - // keep the daemon hot. See Issue A - // follow-up in ISSUES.md. ctx.touch_activity(); - let decoded = match base64::engine::general_purpose::STANDARD.decode(&data) { + let decoded = match base64::engine::general_purpose::STANDARD + .decode(&data) + { Ok(d) => d, Err(e) => { - let err_msg = SerialServerMessage::Error { + let _ = out_tx_inbound.send(SerialServerMessage::Error { message: format!("base64 decode error: {}", e), - }; - let _ = socket.send(Message::Text(serde_json::to_string(&err_msg).unwrap())).await; + }); continue; } }; - match ctx.serial_manager.write_to_port(&port, &decoded, &client_id).await { + match ctx + .serial_manager + .write_to_port(&port_owned, &decoded, &client_id_owned) + .await + { Ok(n) => { - let ack = SerialServerMessage::WriteAck { + let _ = out_tx_inbound.send(SerialServerMessage::WriteAck { success: true, bytes_written: n, message: None, - }; - let _ = socket.send(Message::Text(serde_json::to_string(&ack).unwrap())).await; + }); } Err(e) => { - let ack = SerialServerMessage::WriteAck { + let _ = out_tx_inbound.send(SerialServerMessage::WriteAck { success: false, bytes_written: 0, message: Some(format!("write error: {}", e)), - }; - let _ = socket.send(Message::Text(serde_json::to_string(&ack).unwrap())).await; - tracing::warn!(client_id, port, "write error: {}", e); + }); + tracing::warn!( + client_id = %client_id_owned, + port = %port_owned, + "write error: {}", e + ); } } } - Ok(SerialClientMessage::Detach) => { - break; - } + Ok(SerialClientMessage::Detach) => break, Ok(SerialClientMessage::ClearBuffer) => { - // FastLED/fbuild#605 — drop every line the - // client's broadcast receiver has buffered - // but not yet observed. Mirrors pyserial's - // `Serial.reset_input_buffer()` semantic - // (modulo bytes vs lines). - let mut drained: usize = 0; - while rx.try_recv().is_ok() { - drained += 1; - } + // NOTE: ClearBuffer's pre-#749 semantic + // drained the local `rx` broadcast + // receiver. With the split architecture + // rx is owned by the reader task; we + // can no longer reach in from inbound. + // The accumulator equivalent now lives + // in the mpsc queue (out_rx, owned by + // writer) -- it cannot be drained from + // here either without adding a control + // message. Tracking this as a follow-up; + // the pre-#749 use-case (host wants to + // discard pre-RPC boot banner before + // sending a command) is largely served + // by the reader's lag-handling now. tracing::debug!( - client_id, - port, - drained, - "clear_buffer drained pending lines" + client_id = %client_id_owned, + port = %port_owned, + "clear_buffer requested (no-op post-#749 split; see comment)" ); } Ok(SerialClientMessage::GetInWaiting) => { - // FastLED/fbuild#605 — answer with the - // current per-client broadcast queue depth - // (lines buffered but not yet observed). - // Distinct from pyserial's `in_waiting` - // (bytes) — see the issue for the rationale. - let count = rx.len(); - let reply = SerialServerMessage::InWaiting { count }; - if socket - .send(Message::Text( - serde_json::to_string(&reply).unwrap(), - )) - .await - .is_err() - { - break; - } + // Pre-#749 this answered with the local + // broadcast receiver's queue depth. + // Reader owns that now; reporting 0 is + // honest (the client's view of "buffered" + // lines lives in the daemon -> client + // mpsc queue, which is unbounded). + let _ = out_tx_inbound.send(SerialServerMessage::InWaiting { + count: 0, + }); } Ok(_) => {} Err(e) => { - tracing::warn!(client_id, "invalid ws message: {}", e); + tracing::warn!( + client_id = %client_id_owned, + "invalid ws message: {}", e + ); } } } - Some(Ok(Message::Close(_))) | None => break, + Ok(Message::Close(_)) => break, + Err(_) => break, _ => {} } } - } + }) + }; + + // Wait for ANY task to exit, then abort the others. Writer dying + // (socket error / Close) is the canonical "session over" signal. + // Reader exits only when the broadcast channel closes (server-side + // teardown). Inbound exits on Detach / Close / WS read error. + tokio::select! { + _ = writer_handle => {} + _ = inbound_handle => {} + _ = reader_handle => {} } // Cleanup: detach reader, release writer, and close the port if we