From 184af282aa44cb2a59270910608ac66dd537e78d Mon Sep 17 00:00:00 2001 From: zackees Date: Mon, 22 Jun 2026 06:55:33 -0700 Subject: [PATCH] feat(serial): ReaderControl restores ClearBuffer + GetInWaiting (closes #756) PR #750 split `handle_serial_ws` into reader / writer / inbound tasks for throughput. As a side effect the `ClearBuffer` and `GetInWaiting` RPCs became logged no-ops -- both reach into the broadcast receiver `rx`, which is now owned exclusively by the reader task. Inbound couldn't touch it. This commit restores both via a new internal control channel: ```rust enum ReaderControl { Drain { reply: oneshot::Sender }, GetDepth { reply: oneshot::Sender }, } ``` Wiring: - Unbounded mpsc `(control_tx, control_rx)` created alongside the existing out-mpsc. - Reader's `tokio::select!` adds a branch on `control_rx.recv()`. `biased;` keeps broadcast forwarding the priority so a burst of inbound control requests cannot starve forwarding. - Inbound's `ClearBuffer` handler sends `Drain` over the control channel and awaits the oneshot reply with the drop count. - Inbound's `GetInWaiting` handler sends `GetDepth`, awaits the oneshot reply, and ships the `InWaiting { count }` response through the writer mpsc (preserves the writer-is-sole-WS-sink invariant from #750). Race safety: if the reader has exited between send and recv (session teardown race), the oneshot resolves with `Err` and the inbound handler logs debug / falls back to `count=0` rather than crashing. No API change visible to clients -- the SerialClientMessage protocol is unchanged, only the internals. Build: `soldr cargo build -p fbuild-daemon` clean. Refs FastLED/fbuild#755 (meta). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../fbuild-daemon/src/handlers/websockets.rs | 145 ++++++++++++++---- 1 file changed, 116 insertions(+), 29 deletions(-) diff --git a/crates/fbuild-daemon/src/handlers/websockets.rs b/crates/fbuild-daemon/src/handlers/websockets.rs index 4dd4fa02..b4336b0c 100644 --- a/crates/fbuild-daemon/src/handlers/websockets.rs +++ b/crates/fbuild-daemon/src/handlers/websockets.rs @@ -10,7 +10,27 @@ use fbuild_serial::{SerialClientMessage, SerialServerMessage, SerialStreamEvent} use futures::{SinkExt, StreamExt}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; + +// ReaderControl -- inbound -> reader cross-task RPC for the small set +// of `SerialClientMessage`s that need read-only access to the reader- +// owned broadcast receiver (`ClearBuffer` and `GetInWaiting`). +// +// Pre-#756 these two RPCs were logged no-ops because the post-#749/#750 +// reader/writer/inbound split moved `rx` exclusively into the reader +// task. Adding a control channel + oneshot reply lets inbound borrow +// the operation through the reader without exposing `rx` itself, so +// the original FastLED/fbuild#605 semantics are restored without +// regressing the throughput fix. +// +// Variants intentionally minimal -- one per RPC. New protocol RPCs +// that need read-only `rx` access get a new variant. +enum ReaderControl { + /// Drain `rx` of any buffered events. Reply: number of events dropped. + Drain { reply: oneshot::Sender }, + /// Report `rx.len()` (broadcast queue depth). Reply: count. + GetDepth { reply: oneshot::Sender }, +} // --------------------------------------------------------------------------- // /ws/serial-monitor — existing serial monitor WebSocket @@ -211,6 +231,13 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); let (mut ws_sink, mut ws_stream) = socket.split(); + // Inbound -> reader control channel (#756). Inbound issues Drain / + // GetDepth requests on this; reader handles them inline alongside + // its broadcast.recv(). Unbounded because the only producers are + // the inbound task's ClearBuffer / GetInWaiting handlers, which + // emit at most one message per client RPC -- bounded capacity + // would only add deadlock corner cases for no real win. + let (control_tx, mut control_rx) = mpsc::unbounded_channel::(); // READER task -- broadcast -> mpsc queue. let reader_handle = { @@ -221,7 +248,13 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { tokio::spawn(async move { let mut line_index: u64 = 0; loop { - match rx.recv().await { + tokio::select! { + biased; // prefer broadcast events over control messages, + // so a burst of inbound ClearBuffer requests + // can't starve forwarding (control fires once + // per client RPC; broadcast fires per line). + + broadcast_result = rx.recv() => match broadcast_result { Ok(SerialStreamEvent::Data(line)) => { ctx.touch_activity(); line_index += 1; @@ -302,7 +335,36 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { ); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } + }, // end broadcast_result match + + // ReaderControl branch -- inbound's ClearBuffer / + // GetInWaiting requests land here. Both are O(1) + // on the broadcast receiver, so they don't block + // the main forwarding loop meaningfully. The + // oneshot reply is best-effort: if inbound has + // dropped the receiver between sending the request + // and now (race on session teardown), the reply + // just goes nowhere -- inbound has already exited. + control_opt = control_rx.recv() => { + let Some(cmd) = control_opt else { + // All inbound senders dropped -> session + // teardown. Reader exits its loop too. + break; + }; + match cmd { + ReaderControl::Drain { reply } => { + let mut drained: usize = 0; + while rx.try_recv().is_ok() { + drained += 1; + } + let _ = reply.send(drained); + } + ReaderControl::GetDepth { reply } => { + let _ = reply.send(rx.len()); + } + } + } + } // end tokio::select! } // Dropping `out_tx_reader` here signals the writer that no more // data events will arrive (but inbound's clone keeps the @@ -401,7 +463,10 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { }); // INBOUND task -- WS stream -> serial manager + ack reply via mpsc. + // Also owns the producer side of the #756 ReaderControl channel for + // ClearBuffer / GetInWaiting requests. let inbound_handle = { + let control_tx_inbound = control_tx; let ctx = ctx.clone(); let port_owned = port.clone(); let client_id_owned = client_id.clone(); @@ -454,35 +519,57 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc) { } Ok(SerialClientMessage::Detach) => break, Ok(SerialClientMessage::ClearBuffer) => { - // NOTE: ClearBuffer's pre-#749 semantic - // drained the local `rx` broadcast - // receiver. With the split architecture - // rx is owned by the reader task; we - // can no longer reach in from inbound. - // The accumulator equivalent now lives - // in the mpsc queue (out_rx, owned by - // writer) -- it cannot be drained from - // here either without adding a control - // message. Tracking this as a follow-up; - // the pre-#749 use-case (host wants to - // discard pre-RPC boot banner before - // sending a command) is largely served - // by the reader's lag-handling now. - tracing::debug!( - client_id = %client_id_owned, - port = %port_owned, - "clear_buffer requested (no-op post-#749 split; see comment)" - ); + // Restored ClearBuffer semantic via the + // #756 ReaderControl channel: ask the + // reader (which owns `rx`) to drain it, + // await the drop-count reply, and log. + // Best-effort -- if reader has already + // exited (session teardown race), the + // oneshot resolves with Err and we just + // log debug instead of a hard error. + let (reply_tx, reply_rx) = oneshot::channel(); + if control_tx_inbound + .send(ReaderControl::Drain { reply: reply_tx }) + .is_err() + { + tracing::debug!( + client_id = %client_id_owned, + port = %port_owned, + "clear_buffer: reader gone, dropping request" + ); + } else { + match reply_rx.await { + Ok(drained) => tracing::debug!( + client_id = %client_id_owned, + port = %port_owned, + drained, + "clear_buffer drained pending lines" + ), + Err(_) => tracing::debug!( + client_id = %client_id_owned, + port = %port_owned, + "clear_buffer: reader dropped reply channel" + ), + } + } } Ok(SerialClientMessage::GetInWaiting) => { - // Pre-#749 this answered with the local - // broadcast receiver's queue depth. - // Reader owns that now; reporting 0 is - // honest (the client's view of "buffered" - // lines lives in the daemon -> client - // mpsc queue, which is unbounded). + // Restored GetInWaiting semantic via the + // #756 ReaderControl channel: ask the + // reader for `rx.len()` and reply via + // the writer mpsc. Race-safe (reader- + // gone -> reply 0 honestly). + let (reply_tx, reply_rx) = oneshot::channel(); + let count = if control_tx_inbound + .send(ReaderControl::GetDepth { reply: reply_tx }) + .is_err() + { + 0 + } else { + reply_rx.await.unwrap_or(0) + }; let _ = out_tx_inbound - .send(SerialServerMessage::InWaiting { count: 0 }); + .send(SerialServerMessage::InWaiting { count }); } Ok(_) => {} Err(e) => {