diff --git a/audioipc/src/messages.rs b/audioipc/src/messages.rs index fb97e0f..3cbb493 100644 --- a/audioipc/src/messages.rs +++ b/audioipc/src/messages.rs @@ -294,6 +294,7 @@ pub enum CallbackReq { Data { nframes: isize }, State(ffi::cubeb_state), DeviceChange, + Drain, } #[derive(Debug, Deserialize, Serialize)] @@ -302,6 +303,7 @@ pub enum CallbackResp { State, DeviceChange, Error(c_int), + Drain, } #[derive(Debug, Deserialize, Serialize)] diff --git a/client/src/stream.rs b/client/src/stream.rs index bfa9c96..d43ab30 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -145,6 +145,7 @@ impl rpccore::Server for CallbackServer { CallbackResp::State } + CallbackReq::Drain => CallbackResp::Drain, CallbackReq::DeviceChange => { run_in_callback(|| { let cb = *self.device_change_cb.lock().unwrap(); diff --git a/server/src/server.rs b/server/src/server.rs index 9033741..b9fad72 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -572,10 +572,24 @@ impl CubebServer { .map(|_| ClientMessage::StreamStarted) .unwrap_or_else(error), - ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok) - .stop() - .map(|_| ClientMessage::StreamStopped) - .unwrap_or_else(error), + ServerMessage::StreamStop(stm_tok) => { + let result = try_stream!(self, stm_tok).stop(); + if result.is_ok() { + // Drain the callback pipe so any data callback already queued on + // it completes before we reply StreamStopped. An error here means + // the callback pipe is gone, so there is nothing left to drain. + if let Err(e) = self.streams[stm_tok] + .cbs + .data_callback_rpc + .call(CallbackReq::Drain) + { + debug!("StreamStop({stm_tok}): callback pipe drain failed: {e:?}"); + } + } + result + .map(|_| ClientMessage::StreamStopped) + .unwrap_or_else(error) + } ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok) .position()