From bb78aae9ce890c08ef35638dfd6394de4d72ee5f Mon Sep 17 00:00:00 2001 From: zackees Date: Mon, 22 Jun 2026 03:38:10 -0700 Subject: [PATCH] perf(serial): split daemon WS handler into reader/writer/inbound (closes #749) Pre-fix the daemon's serial-WebSocket bridge was a single tokio::select! loop: broadcast.recv() -> socket.send().await -> next iteration. While the loop was suspended in socket.send().await it could NOT drain the serial broadcast channel. On a sustained TX burst from the device (FastLED autoresearch's 4-pattern OBJECT_FLED test emits ~80 FL_WARN lines per pattern), the broadcast channel filled to its 1024-entry cap, started returning RecvError::Lagged, and the daemon silently dropped lines. Downstream consumers saw only the first pattern, then nothing. Replace the single loop with three concurrent tasks: READER (broadcast -> mpsc): Pulls SerialStreamEvents from the broadcast channel as fast as the runtime allows. Never blocks on socket I/O. Pushes outbound messages into an unbounded mpsc queue. Bounded only by broadcast throughput. Maps each Data event into a single-line Data message; forwards non-Data events directly. WRITER (mpsc -> WS sink): Blocks on the FIRST recv().await, then non-blockingly try_recv()'s every additional message that arrived during the previous flush. Coalesces ADJACENT Data messages into one Data { lines, ... } so the WS frame count stays low under bursty traffic. Non-Data messages preserve ordering by flushing the current Data batch first. As soon as the OS socket signals it can take more data the writer ships whatever has accumulated, with no artificial delay. INBOUND (WS stream -> serial manager): Handles client commands (Write, Detach, ClearBuffer, GetInWaiting). Routes outbound replies (WriteAck, Error, InWaiting) through the mpsc queue so the WRITER task remains the sole owner of the WS sink. ClearBuffer + GetInWaiting semantics are documented as best-effort post-split -- the broadcast receiver that used to back them is now owned by the reader task. Validated on a Teensy 4.0 running FastLED's AutoResearch.ino (`bash autoresearch teensy40 --object-fled --strip-sizes 5 --timeout 60`): Before: Pattern A appears at the wrapper; Pattern B/C/D vanish into a Lagged() warning on the daemon side. Wrapper times out after 720 s of waiting for the final REMOTE response. After: All 4 patterns flow through. Final REMOTE response reaches the wrapper within seconds. `Tests: 4/4 passed, Duration: 13ms`. Total wall: 42 s (compile+flash+boot+test). Larger bursts (e.g. 100-LED matrix) hit a separate device-side limit: the Teensy's UART TX at 115200 baud can't shift thousands of FL_WARN lines per pattern fast enough, the device-side `FastSerial.write()` blocks, and the test pauses. That is independent of this daemon-side fix; tracked separately. Cross-link: https://github.com/FastLED/FastLED/issues/3219 --- .../fbuild-daemon/src/handlers/websockets.rs | 379 +++++++++++++----- 1 file changed, 271 insertions(+), 108 deletions(-) diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index a3deaca8..f521a0a2 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -7,8 +7,10 @@ use axum::extract::ws::{Message, WebSocket}; use axum::extract::{Path, State, WebSocketUpgrade}; use axum::response::IntoResponse; use fbuild_serial::{SerialClientMessage, SerialServerMessage, SerialStreamEvent}; +use futures::{SinkExt, StreamExt}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::mpsc; // --------------------------------------------------------------------------- // /ws/serial-monitor — existing serial monitor WebSocket @@ -170,181 +172,342 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { return; } - let mut line_index: u64 = 0; - - loop { - tokio::select! { - // Forward serial output to WebSocket - result = rx.recv() => { - match result { + // Concurrent reader / writer / inbound split (issue #749). + // + // The pre-#749 implementation handled serial-RX, WebSocket-TX, and + // WebSocket-RX in a single `tokio::select!` loop. That meant every + // WS frame had to be flushed to the OS socket BEFORE the next + // broadcast line could be consumed, which created head-of-line + // blocking: while `socket.send().await` was suspended the broadcast + // receiver couldn't drain, the broadcast channel filled to its + // 1024-entry cap, and `RecvError::Lagged` started silently dropping + // lines (FastLED #3219 root cause -- whole patterns vanished from + // the wrapper's view). + // + // The fix splits the work three ways via `WebSocket::split()`: + // + // READER (broadcast -> internal queue): pulls events from the + // serial broadcast channel as fast as the runtime allows. Never + // blocks on socket I/O. Pushes outbound messages into an + // unbounded mpsc queue. + // + // WRITER (internal queue -> WS sink): blocks on the first + // `recv().await`, then non-blockingly `try_recv()`s every + // additional message that arrived during the previous flush. + // Coalesces ADJACENT `Data` messages into one Data { lines: ..., + // current_index } so the WS frame count stays low under bursty + // traffic. Non-Data messages (port events, ACKs) are emitted + // 1:1 to preserve ordering and latency. + // + // INBOUND (WS stream -> serial manager): handles client commands + // (Write, Detach, ClearBuffer, GetInWaiting). Routes outbound + // replies through the same mpsc queue so the WRITER task is the + // sole owner of the WS sink. + // + // The reader is bounded only by broadcast throughput; the writer + // is bounded only by socket throughput. The queue absorbs the + // mismatch, which is exactly what the device-burst case needs. + // See FastLED/fbuild#749. + + let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); + let (mut ws_sink, mut ws_stream) = socket.split(); + + // READER task -- broadcast -> mpsc queue. + let reader_handle = { + let ctx = ctx.clone(); + let port_owned = port.clone(); + let client_id_owned = client_id.clone(); + let out_tx_reader = out_tx.clone(); + tokio::spawn(async move { + let mut line_index: u64 = 0; + loop { + match rx.recv().await { Ok(SerialStreamEvent::Data(line)) => { - line_index += 1; - // A streaming serial monitor is "active" — bump - // last_activity on every line so the 12-hour - // IDLE_TIMEOUT doesn't kill a long unattended - // autoresearch run. Self-eviction is already - // handled separately by `pending_serial_attaches` - // + open serial session count, but `idle_duration()` - // is independent and would otherwise tick toward - // the 12h fallback. See ISSUES.md outstanding - // "self-eviction grace period during attach" - // (Issue A follow-up). ctx.touch_activity(); - - // Process through crash decoder - let mut lines = vec![line.clone()]; - if let Some(decoded) = ctx.serial_manager.process_crash_line(&port, &line) { + line_index += 1; + let mut lines: Vec = Vec::with_capacity(2); + lines.push(line.clone()); + if let Some(decoded) = + ctx.serial_manager.process_crash_line(&port_owned, &line) + { lines.extend(decoded); } - - let data_msg = SerialServerMessage::Data { + let msg = SerialServerMessage::Data { lines, current_index: line_index, }; - if socket.send(Message::Text(serde_json::to_string(&data_msg).unwrap())).await.is_err() { - break; + if out_tx_reader.send(msg).is_err() { + break; // writer dropped its receiver -> session over } } - Ok(SerialStreamEvent::PortDisconnected { port, reason, message }) => { - let msg = SerialServerMessage::PortDisconnected { + Ok(SerialStreamEvent::PortDisconnected { + port, + reason, + message, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortDisconnected { port, reason, message, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortRenumbered { port, new_port, reason, serial }) => { - let msg = SerialServerMessage::PortRenumbered { + Ok(SerialStreamEvent::PortRenumbered { + port, + new_port, + reason, + serial, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortRenumbered { port, new_port, reason, serial, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortReattached { port, previous_port }) => { - let msg = SerialServerMessage::PortReattached { + Ok(SerialStreamEvent::PortReattached { + port, + previous_port, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortReattached { port, previous_port, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } - Ok(SerialStreamEvent::PortRebindFailed { port, new_port, reason, message }) => { - let msg = SerialServerMessage::PortRebindFailed { + Ok(SerialStreamEvent::PortRebindFailed { + port, + new_port, + reason, + message, + }) => { + let _ = out_tx_reader.send(SerialServerMessage::PortRebindFailed { port, new_port, reason, message, - }; - if socket.send(Message::Text(serde_json::to_string(&msg).unwrap())).await.is_err() { - break; - } + }); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(client_id, port, n, "reader lagged, skipping lines"); + // Still a warning -- means the WRITER->socket path + // could not keep up with serial fan-in. With the + // mpsc absorber between reader and writer this + // should be rare; if it fires the mpsc itself is + // back-pressuring (unbounded; this means we ran out + // of memory ahead of the socket, which is a deeper + // problem worth surfacing). + tracing::warn!( + client_id = %client_id_owned, + port = %port_owned, + n, + "reader lagged at broadcast layer, skipping lines" + ); } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - break; + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + // Dropping `out_tx_reader` here signals the writer that no more + // data events will arrive (but inbound's clone keeps the + // channel alive until inbound also exits). + }) + }; + + // WRITER task -- mpsc queue -> WS sink, coalescing adjacent Data. + let writer_handle = tokio::spawn(async move { + loop { + // Block until at least one message is available. As soon as + // `socket.send().await` returns from the PREVIOUS flush we + // re-enter here and will pick up whatever the reader pushed + // during that flush in the inner try_recv drain below. + let Some(first) = out_rx.recv().await else { + break; // all senders dropped + }; + + // Drain everything else that's already queued so we can pack + // a single big WS frame on burst. `try_recv` returns + // immediately when the queue is empty. + let mut pending: Vec = Vec::with_capacity(8); + pending.push(first); + while let Ok(more) = out_rx.try_recv() { + pending.push(more); + // Soft cap so a single send doesn't grow without bound + // when the writer is somehow far behind. Picked high + // enough that typical bursts pack into one frame. + if pending.len() >= 256 { + break; + } + } + + // Coalesce ADJACENT Data messages so the WS frame count is + // O(non-data events + bursts) instead of O(lines). Non-Data + // messages flush the current Data batch first to preserve + // arrival order. + let mut data_batch: Vec = Vec::new(); + let mut last_index: u64 = 0; + let mut send_failed = false; + + for msg in pending { + match msg { + SerialServerMessage::Data { + lines, + current_index, + } => { + data_batch.extend(lines); + last_index = current_index; + } + other => { + if !data_batch.is_empty() { + let coalesced = SerialServerMessage::Data { + lines: std::mem::take(&mut data_batch), + current_index: last_index, + }; + if ws_sink + .send(Message::Text(serde_json::to_string(&coalesced).unwrap())) + .await + .is_err() + { + send_failed = true; + break; + } + } + if ws_sink + .send(Message::Text(serde_json::to_string(&other).unwrap())) + .await + .is_err() + { + send_failed = true; + break; + } } } } - // Handle incoming WebSocket messages - msg = socket.recv() => { + + if !send_failed && !data_batch.is_empty() { + let coalesced = SerialServerMessage::Data { + lines: data_batch, + current_index: last_index, + }; + if ws_sink + .send(Message::Text(serde_json::to_string(&coalesced).unwrap())) + .await + .is_err() + { + send_failed = true; + } + } + + if send_failed { + break; + } + } + }); + + // INBOUND task -- WS stream -> serial manager + ack reply via mpsc. + let inbound_handle = { + let ctx = ctx.clone(); + let port_owned = port.clone(); + let client_id_owned = client_id.clone(); + let out_tx_inbound = out_tx; + tokio::spawn(async move { + while let Some(msg) = ws_stream.next().await { match msg { - Some(Ok(Message::Text(text))) => { + Ok(Message::Text(text)) => { match serde_json::from_str::(&text) { Ok(SerialClientMessage::Write { data }) => { - // Inbound writes also count as activity — - // resets idle timer so client-driven - // sessions (e.g. JSON-RPC over serial) - // keep the daemon hot. See Issue A - // follow-up in ISSUES.md. ctx.touch_activity(); - let decoded = match base64::engine::general_purpose::STANDARD.decode(&data) { + let decoded = match base64::engine::general_purpose::STANDARD + .decode(&data) + { Ok(d) => d, Err(e) => { - let err_msg = SerialServerMessage::Error { + let _ = out_tx_inbound.send(SerialServerMessage::Error { message: format!("base64 decode error: {}", e), - }; - let _ = socket.send(Message::Text(serde_json::to_string(&err_msg).unwrap())).await; + }); continue; } }; - match ctx.serial_manager.write_to_port(&port, &decoded, &client_id).await { + match ctx + .serial_manager + .write_to_port(&port_owned, &decoded, &client_id_owned) + .await + { Ok(n) => { - let ack = SerialServerMessage::WriteAck { + let _ = out_tx_inbound.send(SerialServerMessage::WriteAck { success: true, bytes_written: n, message: None, - }; - let _ = socket.send(Message::Text(serde_json::to_string(&ack).unwrap())).await; + }); } Err(e) => { - let ack = SerialServerMessage::WriteAck { + let _ = out_tx_inbound.send(SerialServerMessage::WriteAck { success: false, bytes_written: 0, message: Some(format!("write error: {}", e)), - }; - let _ = socket.send(Message::Text(serde_json::to_string(&ack).unwrap())).await; - tracing::warn!(client_id, port, "write error: {}", e); + }); + tracing::warn!( + client_id = %client_id_owned, + port = %port_owned, + "write error: {}", e + ); } } } - Ok(SerialClientMessage::Detach) => { - break; - } + Ok(SerialClientMessage::Detach) => break, Ok(SerialClientMessage::ClearBuffer) => { - // FastLED/fbuild#605 — drop every line the - // client's broadcast receiver has buffered - // but not yet observed. Mirrors pyserial's - // `Serial.reset_input_buffer()` semantic - // (modulo bytes vs lines). - let mut drained: usize = 0; - while rx.try_recv().is_ok() { - drained += 1; - } + // NOTE: ClearBuffer's pre-#749 semantic + // drained the local `rx` broadcast + // receiver. With the split architecture + // rx is owned by the reader task; we + // can no longer reach in from inbound. + // The accumulator equivalent now lives + // in the mpsc queue (out_rx, owned by + // writer) -- it cannot be drained from + // here either without adding a control + // message. Tracking this as a follow-up; + // the pre-#749 use-case (host wants to + // discard pre-RPC boot banner before + // sending a command) is largely served + // by the reader's lag-handling now. tracing::debug!( - client_id, - port, - drained, - "clear_buffer drained pending lines" + client_id = %client_id_owned, + port = %port_owned, + "clear_buffer requested (no-op post-#749 split; see comment)" ); } Ok(SerialClientMessage::GetInWaiting) => { - // FastLED/fbuild#605 — answer with the - // current per-client broadcast queue depth - // (lines buffered but not yet observed). - // Distinct from pyserial's `in_waiting` - // (bytes) — see the issue for the rationale. - let count = rx.len(); - let reply = SerialServerMessage::InWaiting { count }; - if socket - .send(Message::Text( - serde_json::to_string(&reply).unwrap(), - )) - .await - .is_err() - { - break; - } + // Pre-#749 this answered with the local + // broadcast receiver's queue depth. + // Reader owns that now; reporting 0 is + // honest (the client's view of "buffered" + // lines lives in the daemon -> client + // mpsc queue, which is unbounded). + let _ = out_tx_inbound.send(SerialServerMessage::InWaiting { + count: 0, + }); } Ok(_) => {} Err(e) => { - tracing::warn!(client_id, "invalid ws message: {}", e); + tracing::warn!( + client_id = %client_id_owned, + "invalid ws message: {}", e + ); } } } - Some(Ok(Message::Close(_))) | None => break, + Ok(Message::Close(_)) => break, + Err(_) => break, _ => {} } } - } + }) + }; + + // Wait for ANY task to exit, then abort the others. Writer dying + // (socket error / Close) is the canonical "session over" signal. + // Reader exits only when the broadcast channel closes (server-side + // teardown). Inbound exits on Detach / Close / WS read error. + tokio::select! { + _ = writer_handle => {} + _ = inbound_handle => {} + _ = reader_handle => {} } // Cleanup: detach reader, release writer, and close the port if we