Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 116 additions & 29 deletions crates/fbuild-daemon/src/handlers/websockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,27 @@ use fbuild_serial::{SerialClientMessage, SerialServerMessage, SerialStreamEvent}
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};

// ReaderControl -- inbound -> reader cross-task RPC for the small set
// of `SerialClientMessage`s that need read-only access to the reader-
// owned broadcast receiver (`ClearBuffer` and `GetInWaiting`).
//
// Pre-#756 these two RPCs were logged no-ops because the post-#749/#750
// reader/writer/inbound split moved `rx` exclusively into the reader
// task. Adding a control channel + oneshot reply lets inbound borrow
// the operation through the reader without exposing `rx` itself, so
// the original FastLED/fbuild#605 semantics are restored without
// regressing the throughput fix.
//
// Variants intentionally minimal -- one per RPC. New protocol RPCs
// that need read-only `rx` access get a new variant.
enum ReaderControl {
/// Drain `rx` of any buffered events. Reply: number of events dropped.
Drain { reply: oneshot::Sender<usize> },
/// Report `rx.len()` (broadcast queue depth). Reply: count.
GetDepth { reply: oneshot::Sender<usize> },
}

// ---------------------------------------------------------------------------
// /ws/serial-monitor — existing serial monitor WebSocket
Expand Down Expand Up @@ -211,6 +231,13 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {

let (out_tx, mut out_rx) = mpsc::unbounded_channel::<SerialServerMessage>();
let (mut ws_sink, mut ws_stream) = socket.split();
// Inbound -> reader control channel (#756). Inbound issues Drain /
// GetDepth requests on this; reader handles them inline alongside
// its broadcast.recv(). Unbounded because the only producers are
// the inbound task's ClearBuffer / GetInWaiting handlers, which
// emit at most one message per client RPC -- bounded capacity
// would only add deadlock corner cases for no real win.
let (control_tx, mut control_rx) = mpsc::unbounded_channel::<ReaderControl>();

// READER task -- broadcast -> mpsc queue.
let reader_handle = {
Expand All @@ -221,7 +248,13 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {
tokio::spawn(async move {
let mut line_index: u64 = 0;
loop {
match rx.recv().await {
tokio::select! {
biased; // prefer broadcast events over control messages,
// so a burst of inbound ClearBuffer requests
// can't starve forwarding (control fires once
// per client RPC; broadcast fires per line).

broadcast_result = rx.recv() => match broadcast_result {
Ok(SerialStreamEvent::Data(line)) => {
ctx.touch_activity();
line_index += 1;
Expand Down Expand Up @@ -302,7 +335,36 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}, // end broadcast_result match

// ReaderControl branch -- inbound's ClearBuffer /
// GetInWaiting requests land here. Both are O(1)
// on the broadcast receiver, so they don't block
// the main forwarding loop meaningfully. The
// oneshot reply is best-effort: if inbound has
// dropped the receiver between sending the request
// and now (race on session teardown), the reply
// just goes nowhere -- inbound has already exited.
control_opt = control_rx.recv() => {
let Some(cmd) = control_opt else {
// All inbound senders dropped -> session
// teardown. Reader exits its loop too.
break;
};
match cmd {
ReaderControl::Drain { reply } => {
let mut drained: usize = 0;
while rx.try_recv().is_ok() {
drained += 1;
}
let _ = reply.send(drained);
}
ReaderControl::GetDepth { reply } => {
let _ = reply.send(rx.len());
}
}
}
} // end tokio::select!
}
// Dropping `out_tx_reader` here signals the writer that no more
// data events will arrive (but inbound's clone keeps the
Expand Down Expand Up @@ -401,7 +463,10 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {
});

// INBOUND task -- WS stream -> serial manager + ack reply via mpsc.
// Also owns the producer side of the #756 ReaderControl channel for
// ClearBuffer / GetInWaiting requests.
let inbound_handle = {
let control_tx_inbound = control_tx;
let ctx = ctx.clone();
let port_owned = port.clone();
let client_id_owned = client_id.clone();
Expand Down Expand Up @@ -454,35 +519,57 @@ async fn handle_serial_ws(mut socket: WebSocket, ctx: Arc<DaemonContext>) {
}
Ok(SerialClientMessage::Detach) => break,
Ok(SerialClientMessage::ClearBuffer) => {
// NOTE: ClearBuffer's pre-#749 semantic
// drained the local `rx` broadcast
// receiver. With the split architecture
// rx is owned by the reader task; we
// can no longer reach in from inbound.
// The accumulator equivalent now lives
// in the mpsc queue (out_rx, owned by
// writer) -- it cannot be drained from
// here either without adding a control
// message. Tracking this as a follow-up;
// the pre-#749 use-case (host wants to
// discard pre-RPC boot banner before
// sending a command) is largely served
// by the reader's lag-handling now.
tracing::debug!(
client_id = %client_id_owned,
port = %port_owned,
"clear_buffer requested (no-op post-#749 split; see comment)"
);
// Restored ClearBuffer semantic via the
// #756 ReaderControl channel: ask the
// reader (which owns `rx`) to drain it,
// await the drop-count reply, and log.
// Best-effort -- if reader has already
// exited (session teardown race), the
// oneshot resolves with Err and we just
// log debug instead of a hard error.
let (reply_tx, reply_rx) = oneshot::channel();
if control_tx_inbound
.send(ReaderControl::Drain { reply: reply_tx })
.is_err()
{
tracing::debug!(
client_id = %client_id_owned,
port = %port_owned,
"clear_buffer: reader gone, dropping request"
);
} else {
match reply_rx.await {
Ok(drained) => tracing::debug!(
client_id = %client_id_owned,
port = %port_owned,
drained,
"clear_buffer drained pending lines"
),
Err(_) => tracing::debug!(
client_id = %client_id_owned,
port = %port_owned,
"clear_buffer: reader dropped reply channel"
),
}
}
}
Ok(SerialClientMessage::GetInWaiting) => {
// Pre-#749 this answered with the local
// broadcast receiver's queue depth.
// Reader owns that now; reporting 0 is
// honest (the client's view of "buffered"
// lines lives in the daemon -> client
// mpsc queue, which is unbounded).
// Restored GetInWaiting semantic via the
// #756 ReaderControl channel: ask the
// reader for `rx.len()` and reply via
// the writer mpsc. Race-safe (reader-
// gone -> reply 0 honestly).
let (reply_tx, reply_rx) = oneshot::channel();
let count = if control_tx_inbound
.send(ReaderControl::GetDepth { reply: reply_tx })
.is_err()
{
0
} else {
reply_rx.await.unwrap_or(0)
};
let _ = out_tx_inbound
.send(SerialServerMessage::InWaiting { count: 0 });
.send(SerialServerMessage::InWaiting { count });
}
Ok(_) => {}
Err(e) => {
Expand Down
Loading