From 8b6c39ed3ca99470700fa21b00d460ff87973f60 Mon Sep 17 00:00:00 2001 From: Paul Adenot Date: Mon, 1 Jun 2026 15:27:18 +0000 Subject: [PATCH] Drain the callback pipe on stream stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit audioipc2 services each stream over two independent IPC pipes, each on its own thread per side: the control pipe (StreamStop/StreamStopped) and the callback pipe (CallbackReq::Data/CallbackResp::Data). There is no ordering guarantee between them, so a data callback already queued on the callback pipe can be delivered to the client after StreamStopped has come back on the control pipe — i.e. after cubeb_stream_stop() has returned. This violates cubeb's contract that no data callback runs after stop. The fix is to add CallbackReq::Drain / CallbackResp::Drain. After stopping the native stream, the server makes a *synchronous* Drain call on the callback pipe before replying StreamStopped. The client services that pipe on a single thread in receive order, so by the time it answers Drain every earlier data callback has already run to completion and returned. The call must stay blocking: a fire-and-forget Drain would not order anything and would reintroduce the race. This guarantee is specific to data callbacks, which users rely on to synchronize shutdown sequences.Sstate and device change callbacks aren't affected by this fix and aren't a problem. --- audioipc/src/messages.rs | 2 ++ client/src/stream.rs | 1 + server/src/server.rs | 22 ++++++++++++++++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/audioipc/src/messages.rs b/audioipc/src/messages.rs index fb97e0f..3cbb493 100644 --- a/audioipc/src/messages.rs +++ b/audioipc/src/messages.rs @@ -294,6 +294,7 @@ pub enum CallbackReq { Data { nframes: isize }, State(ffi::cubeb_state), DeviceChange, + Drain, } #[derive(Debug, Deserialize, Serialize)] @@ -302,6 +303,7 @@ pub enum CallbackResp { State, DeviceChange, Error(c_int), + Drain, } #[derive(Debug, Deserialize, Serialize)] diff --git a/client/src/stream.rs b/client/src/stream.rs index bfa9c96..d43ab30 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -145,6 +145,7 @@ impl rpccore::Server for CallbackServer { CallbackResp::State } + CallbackReq::Drain => CallbackResp::Drain, CallbackReq::DeviceChange => { run_in_callback(|| { let cb = *self.device_change_cb.lock().unwrap(); diff --git a/server/src/server.rs b/server/src/server.rs index 9033741..b9fad72 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -572,10 +572,24 @@ impl CubebServer { .map(|_| ClientMessage::StreamStarted) .unwrap_or_else(error), - ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok) - .stop() - .map(|_| ClientMessage::StreamStopped) - .unwrap_or_else(error), + ServerMessage::StreamStop(stm_tok) => { + let result = try_stream!(self, stm_tok).stop(); + if result.is_ok() { + // Drain the callback pipe so any data callback already queued on + // it completes before we reply StreamStopped. An error here means + // the callback pipe is gone, so there is nothing left to drain. + if let Err(e) = self.streams[stm_tok] + .cbs + .data_callback_rpc + .call(CallbackReq::Drain) + { + debug!("StreamStop({stm_tok}): callback pipe drain failed: {e:?}"); + } + } + result + .map(|_| ClientMessage::StreamStopped) + .unwrap_or_else(error) + } ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok) .position()