diff --git a/crates/client/src/listeners.rs b/crates/client/src/listeners.rs index c305f329..705efc73 100644 --- a/crates/client/src/listeners.rs +++ b/crates/client/src/listeners.rs @@ -644,19 +644,24 @@ pub(crate) async fn process_received_message( // per channel) dimensions of `VoiceState.participants`. // Without these gates, any signed peer can flood arbitrary // `channel_id`s and exhaust receiving clients' memory. - let known = willow_actor::state::select(&ctx.event_state, { + // + // The wire carries the canonical `channel_id` (UUID); voice state + // and UI events are keyed by the channel *name* (the whole UI is + // name-keyed). Resolve id -> name here: `None` both fails the + // existence gate and means "unknown channel". + let name = willow_actor::state::select(&ctx.event_state, { let ch = channel_id.clone(); - move |es| es.channels.contains_key(&ch) + move |es| es.channels.get(&ch).map(|c| c.name.clone()) }) .await; - if !known { + let Some(name) = name else { tracing::debug!( %signer, channel_id = %channel_id, "dropping VoiceJoin: channel_id not in ServerState.channels" ); return; - } - let ch = channel_id.clone(); + }; + let ch = name.clone(); let inserted = willow_actor::state::mutate(&ctx.voice, move |v| { let new_channel = !v.participants.contains_key(&ch); if new_channel && v.participants.len() >= MAX_VOICE_CHANNELS { @@ -680,7 +685,7 @@ pub(crate) async fn process_received_message( warn_if_err( ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceJoined { - channel_id, + channel_id: name, peer_id, })), "event_broker.do_send Publish(VoiceJoined)", @@ -695,19 +700,21 @@ pub(crate) async fn process_received_message( // UI. The `participants` map is unaffected even without the // gate (`get_mut` returns None), but the event-broker fanout // would still leak attacker-controlled `channel_id`s. - let known = willow_actor::state::select(&ctx.event_state, { + // Resolve the wire `channel_id` (UUID) to the name used by voice + // state + UI events; `None` fails the existence gate. + let name = willow_actor::state::select(&ctx.event_state, { let ch = channel_id.clone(); - move |es| es.channels.contains_key(&ch) + move |es| es.channels.get(&ch).map(|c| c.name.clone()) }) .await; - if !known { + let Some(name) = name else { tracing::debug!( %signer, channel_id = %channel_id, "dropping VoiceLeave: channel_id not in ServerState.channels" ); return; - } - let ch = channel_id.clone(); + }; + let ch = name.clone(); willow_actor::state::mutate(&ctx.voice, move |v| { if let Some(p) = v.participants.get_mut(&ch) { p.remove(&peer_id); @@ -717,7 +724,7 @@ pub(crate) async fn process_received_message( warn_if_err( ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceLeft { - channel_id, + channel_id: name, peer_id, })), "event_broker.do_send Publish(VoiceLeft)", @@ -734,22 +741,22 @@ pub(crate) async fn process_received_message( // event handlers. Signals do not mutate `participants` // directly, but the existence check shuts the same fanout // gap as Join/Leave. - let known = willow_actor::state::select(&ctx.event_state, { + let name = willow_actor::state::select(&ctx.event_state, { let ch = channel_id.clone(); - move |es| es.channels.contains_key(&ch) + move |es| es.channels.get(&ch).map(|c| c.name.clone()) }) .await; - if !known { + let Some(name) = name else { tracing::debug!( %signer, channel_id = %channel_id, "dropping VoiceSignal: channel_id not in ServerState.channels" ); return; - } + }; warn_if_err( ctx.event_broker .do_send(willow_actor::Publish(ClientEvent::VoiceSignal { - channel_id, + channel_id: name, from_peer: signer, signal, })), @@ -1604,6 +1611,18 @@ mod tests { .expect("test_client must seed at least one channel") } + /// The display name of the first seeded channel. Voice state + events are + /// keyed by name (the wire carries the UUID `channel_id`, which the + /// listener resolves to this name), so state assertions look up by name. + async fn known_channel_name(client: &ClientHandle) -> String { + let snap = client.state_snapshot().await; + snap.channels + .values() + .next() + .map(|c| c.name.clone()) + .expect("test_client must seed at least one channel") + } + /// VoiceJoin against an unknown `channel_id` must not mutate /// `VoiceState.participants` (defends against attackers flooding /// random channel ids until memory is exhausted). @@ -1652,7 +1671,10 @@ mod tests { .await .expect("subscribe must succeed"); let ctx = make_ctx(&client); + // Wire is addressed by canonical channel_id (UUID); voice state is + // keyed by the resolved channel name. let channel_id = known_channel_id(&client).await; + let channel_name = known_channel_name(&client).await; let attacker = willow_identity::Identity::generate(); let attacker_id = attacker.endpoint_id(); @@ -1665,7 +1687,7 @@ mod tests { let stored = poll_until_some( || { - let ch = channel_id.clone(); + let ch = channel_name.clone(); willow_actor::state::select(&client.voice_state_addr, move |v| { v.participants .get(&ch) @@ -1679,7 +1701,7 @@ mod tests { assert_eq!( stored, Some(1), - "VoiceJoin on a known channel must record the participant" + "VoiceJoin on a known channel must record the participant (keyed by name)" ); } @@ -1748,10 +1770,12 @@ mod tests { .await .expect("subscribe must succeed"); let ctx = make_ctx(&client); + // Wire is addressed by channel_id (UUID); state is keyed by name. let channel_id = known_channel_id(&client).await; + let channel_name = known_channel_name(&client).await; - // Fill the cap with synthetic participants on the real channel. - let cap_channel = channel_id.clone(); + // Fill the cap with synthetic participants on the real channel (by name). + let cap_channel = channel_name.clone(); willow_actor::state::mutate(&client.voice_state_addr, move |v| { let set = v.participants.entry(cap_channel).or_default(); for _ in 0..MAX_PARTICIPANTS_PER_CHANNEL { @@ -1763,7 +1787,7 @@ mod tests { // Confirm preconditions. let pre = voice_snapshot(&client).await; assert_eq!( - pre.participants.get(&channel_id).map(|s| s.len()), + pre.participants.get(&channel_name).map(|s| s.len()), Some(MAX_PARTICIPANTS_PER_CHANNEL), "test setup must fill participants to cap" ); @@ -1781,7 +1805,7 @@ mod tests { let snap = voice_snapshot(&client).await; let set = snap .participants - .get(&channel_id) + .get(&channel_name) .expect("channel set must still exist"); assert_eq!( set.len(), diff --git a/crates/client/src/mutations.rs b/crates/client/src/mutations.rs index aa777d0d..88358992 100644 --- a/crates/client/src/mutations.rs +++ b/crates/client/src/mutations.rs @@ -156,6 +156,30 @@ impl ClientMutations { channel_id.ok_or_else(|| anyhow::anyhow!("channel not found: {channel}")) } + /// Resolve a channel reference (already a `channel_id`, or a display name) + /// to the canonical `channel_id` (UUID) used on the wire. + /// + /// Voice signaling (`VoiceJoin`/`VoiceLeave`/`VoiceSignal`) must address + /// channels by their canonical id so the receiver's existence gate — which + /// validates against `ServerState.channels` (keyed by id) — accepts them. + /// The UI keys voice state by name, so this accepts either form: an exact + /// id match wins, otherwise it falls back to a name lookup. Returns `None` + /// if the channel is unknown. + pub(crate) async fn channel_id_for_voice(&self, channel: &str) -> Option { + let ch = channel.to_string(); + willow_actor::state::select(&self.event_state, move |es| { + if es.channels.contains_key(&ch) { + Some(ch.clone()) + } else { + es.channels + .iter() + .find(|(_, c)| c.name == ch) + .map(|(id, _)| id.clone()) + } + }) + .await + } + /// Sync event_state mirror from ManagedDag, persist, and emit /// ClientEvents. Called after build_event succeeds — ManagedDag has /// already applied the event to its internal state atomically. @@ -699,21 +723,33 @@ impl ClientMutations { impl ClientMutations { /// Join a voice channel. - pub async fn join_voice(&self, channel_id: &str) { + /// + /// `channel` is a channel name (the UI's identifier) or a `channel_id`. + /// Local voice state stays keyed by the channel *name* (consistent with the + /// name-keyed UI); the broadcast `VoiceJoin` carries the canonical + /// `channel_id` (UUID) so remote peers' existence gate accepts it. + pub async fn join_voice(&self, channel: &str) { let in_voice = willow_actor::state::select(&self.voice, |v| v.active_channel.is_some()).await; if in_voice { self.leave_voice().await; } - let ch = channel_id.to_string(); + // Local voice state is keyed by the caller's reference (the UI's + // channel name), so the UI — which reads the same key — matches. let my_peer_id = self.identity.endpoint_id(); + let ch = channel.to_string(); willow_actor::state::mutate(&self.voice, move |v| { v.active_channel = Some(ch.clone()); v.participants.entry(ch).or_default().insert(my_peer_id); }) .await; + // Broadcast addressed by canonical channel_id so peers accept it. + let Some(channel_id) = self.channel_id_for_voice(channel).await else { + tracing::warn!(%channel, "join_voice: unknown channel, not broadcasting"); + return; + }; let msg = ops::WireMessage::VoiceJoin { - channel_id: channel_id.to_string(), + channel_id, peer_id: my_peer_id, }; if let Some(data) = ops::pack_wire(&msg, &self.identity) { @@ -736,8 +772,14 @@ impl ClientMutations { }) .await; if let Some(ch) = maybe_ch { + // `ch` is the local key (channel name); address the wire message + // by canonical channel_id so peers' existence gate accepts it. + let Some(channel_id) = self.channel_id_for_voice(&ch).await else { + tracing::warn!(channel = %ch, "leave_voice: unknown channel, not broadcasting"); + return; + }; let msg = ops::WireMessage::VoiceLeave { - channel_id: ch, + channel_id, peer_id: my_peer_id, }; if let Some(data) = ops::pack_wire(&msg, &self.identity) { diff --git a/crates/client/src/voice.rs b/crates/client/src/voice.rs index bdb46b83..1d98eb02 100644 --- a/crates/client/src/voice.rs +++ b/crates/client/src/voice.rs @@ -40,14 +40,23 @@ impl ClientHandle { willow_actor::state::select(&self.voice_state_addr, |v| v.deafened).await } - pub fn send_voice_signal( + /// Send a WebRTC signaling message to a peer. + /// + /// `channel` is the UI's channel reference (name) or a `channel_id`; it is + /// resolved to the canonical `channel_id` (UUID) so the receiver's + /// existence gate accepts it. Async because resolution reads event state. + pub async fn send_voice_signal( &self, - channel_id: &str, + channel: &str, target: willow_identity::EndpointId, signal: ops::VoiceSignalPayload, ) { + let Some(channel_id) = self.mutation_handle.channel_id_for_voice(channel).await else { + tracing::warn!(%channel, "send_voice_signal: unknown channel"); + return; + }; let msg = ops::WireMessage::VoiceSignal { - channel_id: channel_id.to_string(), + channel_id, target_peer: target, signal, }; diff --git a/crates/common/src/wire.rs b/crates/common/src/wire.rs index 9382c3a0..a91210dc 100644 --- a/crates/common/src/wire.rs +++ b/crates/common/src/wire.rs @@ -240,6 +240,17 @@ const SIGNALING_CAP: usize = 4 * 1024; /// fall-through for variants that don't have a dedicated cap. const DEFAULT_CAP: usize = 64 * 1024; +/// Per-variant size cap for WebRTC SDP signaling: 64 KB. +/// +/// [`WireMessage::VoiceSignal`] carries SDP offers/answers, **not** just tiny +/// ICE-candidate blobs. A real video offer (multiple codecs, RTP header +/// extensions, trickle-ICE history) routinely runs 5-15 KB and can grow larger +/// after renegotiation, so the 4 KB [`SIGNALING_CAP`] silently dropped them +/// (see `docs/reports/2026-06-07-voice-media-connectivity-investigation.md`). +/// 64 KB leaves ample headroom while staying inside the transport-level +/// [`willow_transport::MAX_DESER_SIZE`] (256 KB) ceiling. +const SDP_CAP: usize = 64 * 1024; + impl WireMessage { /// Returns the maximum permitted serialized size, in bytes, for this /// variant when it appears on the wire. @@ -267,9 +278,12 @@ impl WireMessage { /// formal length limit yet, but 64 KB is wildly more than any /// reasonable display name. /// - **Signaling variants** (`TypingIndicator`, `VoiceJoin`, - /// `VoiceLeave`, `VoiceSignal`, `JoinRequest`, `JoinResponse`, - /// `JoinDenied`, `SyncRequest`): `SIGNALING_CAP` (4 KB). These - /// carry only ids, short strings, and SDP/ICE blobs — all small. + /// `VoiceLeave`, `JoinRequest`, `JoinResponse`, `JoinDenied`, + /// `SyncRequest`): `SIGNALING_CAP` (4 KB). These carry only ids, short + /// strings, and tiny control payloads. + /// - **`VoiceSignal`**: `SDP_CAP` (64 KB). Carries WebRTC SDP + /// offers/answers, which are far larger than the other signaling + /// variants — see [`SDP_CAP`]. pub fn max_size(&self) -> usize { match self { // User-generated bodies, batched payloads, and topic announces: @@ -301,10 +315,11 @@ impl WireMessage { | WireMessage::TypingIndicator { .. } | WireMessage::VoiceJoin { .. } | WireMessage::VoiceLeave { .. } - | WireMessage::VoiceSignal { .. } | WireMessage::JoinRequest { .. } | WireMessage::JoinResponse { .. } | WireMessage::JoinDenied { .. } => SIGNALING_CAP, + // WebRTC SDP offers/answers are far larger than other signaling. + WireMessage::VoiceSignal { .. } => SDP_CAP, } } @@ -1097,6 +1112,71 @@ mod tests { } } + /// Build an SDP blob of approximately `kb` kilobytes that looks like a + /// real video offer (the size, not the exact grammar, is what matters + /// for the per-variant cap test). + fn fake_video_sdp(kb: usize) -> String { + // A real video SDP is dominated by repeated codec/fmtp/rtcp-fb and + // ICE candidate lines. Emit enough to reach ~kb KB. + let line = "a=rtpmap:96 VP9/90000\r\na=fmtp:96 profile-id=0;max-fs=12288;max-fr=60\r\na=rtcp-fb:96 nack pli\r\n"; + let mut sdp = String::from("v=0\r\no=- 0 0 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\n"); + while sdp.len() < kb * 1024 { + sdp.push_str(line); + } + sdp + } + + #[test] + fn voice_signal_offer_survives_realistic_video_sdp() { + // A real video SDP offer (multiple codecs, header extensions, trickle + // ICE history) routinely runs 5-15 KB. It must survive the wire + // round-trip — the per-variant cap for VoiceSignal has to leave room + // for SDP, not just tiny ICE-candidate blobs. + let id = Identity::generate(); + let target = Identity::generate().endpoint_id(); + let sdp = fake_video_sdp(10); // ~10 KB + assert!( + sdp.len() > 4 * 1024, + "test SDP must exceed the old 4 KB cap" + ); + let msg = WireMessage::VoiceSignal { + channel_id: "voice-1".to_string(), + target_peer: target, + signal: VoiceSignalPayload::Offer(sdp.clone()), + }; + let data = pack_wire(&msg, &id).unwrap(); + let decoded = unpack_wire(&data); + assert!( + decoded.is_some(), + "a ~10 KB video SDP offer must not be dropped by the per-variant cap" + ); + match decoded.unwrap().0 { + WireMessage::VoiceSignal { + signal: VoiceSignalPayload::Offer(got), + .. + } => assert_eq!(got, sdp), + other => panic!("expected VoiceSignal Offer, got {other:?}"), + } + } + + #[test] + fn oversize_voice_signal_is_still_rejected() { + // The SDP cap still bounds abuse: a payload past the cap is dropped. + let id = Identity::generate(); + let target = Identity::generate().endpoint_id(); + let huge = fake_video_sdp(100); // 100 KB, past the SDP cap + let msg = WireMessage::VoiceSignal { + channel_id: "voice-1".to_string(), + target_peer: target, + signal: VoiceSignalPayload::Offer(huge), + }; + let data = pack_wire(&msg, &id).unwrap(); + assert!( + unpack_wire(&data).is_none(), + "a VoiceSignal past the SDP cap must still be rejected" + ); + } + #[test] fn per_variant_caps_are_sized_appropriately() { // Sanity: caps should be ordered signaling < profile <= body, and diff --git a/crates/web/src/app.rs b/crates/web/src/app.rs index 2f299862..abd382aa 100644 --- a/crates/web/src/app.rs +++ b/crates/web/src/app.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::rc::Rc; use leptos::prelude::*; @@ -51,8 +50,13 @@ const LOADING_TIMEOUT_MS: u32 = 5_000; /// Wrapper around `willow_client::ClientHandle` that is `Send` for single-threaded WASM. pub type WebClientHandle = SendWrapper>; -/// Wrapper around `Rc>` that is `Send` for single-threaded WASM. -pub type VoiceManagerHandle = SendWrapper>>; +/// Wrapper around `Rc` that is `Send` for single-threaded WASM. +/// +/// No outer `RefCell`: `VoiceManager` owns its mutable state behind interior +/// mutability and exposes only `&self` methods, so callers can `.await` the +/// async signaling methods without holding a borrow across the await point +/// (which previously risked a `RefCell` double-borrow panic mid-negotiation). +pub type VoiceManagerHandle = SendWrapper>; /// Default relay URL for the deployed Willow relay server. pub const DEFAULT_RELAY_URL: &str = "https://willow.intendednull.com:9443"; @@ -313,35 +317,39 @@ pub fn App() -> impl IntoView { let voice_channel_for_signal = app_state.voice.voice_channel; let set_remote_streams = write.voice.set_remote_video_streams; let set_speaking = write.voice.set_speaking_peers; - let voice_manager: VoiceManagerHandle = - SendWrapper::new(Rc::new(RefCell::new(VoiceManager::new( - local_peer_id, - move |target_peer: &str, signal_type: &str, payload: &str| { - let ch_id = voice_channel_for_signal.get_untracked().unwrap_or_default(); - let signal = match signal_type { - "offer" => VoiceSignalPayload::Offer(payload.to_string()), - "answer" => VoiceSignalPayload::Answer(payload.to_string()), - "ice" => VoiceSignalPayload::IceCandidate(payload.to_string()), - _ => return, - }; - if let Ok(target) = target_peer.parse::() { - voice_signal_handle.send_voice_signal(&ch_id, target, signal); - } - }, - move |peer_id: &str, stream: Option| { - let pid = peer_id.to_string(); - set_remote_streams.update(move |map| { - if let Some(s) = stream { - map.insert(pid, send_wrapper::SendWrapper::new(s)); - } else { - map.remove(&pid); - } + let voice_manager: VoiceManagerHandle = SendWrapper::new(Rc::new(VoiceManager::new( + local_peer_id, + move |target_peer: &str, signal_type: &str, payload: &str| { + let ch_id = voice_channel_for_signal.get_untracked().unwrap_or_default(); + let signal = match signal_type { + "offer" => VoiceSignalPayload::Offer(payload.to_string()), + "answer" => VoiceSignalPayload::Answer(payload.to_string()), + "ice" => VoiceSignalPayload::IceCandidate(payload.to_string()), + _ => return, + }; + if let Ok(target) = target_peer.parse::() { + // send_voice_signal resolves the channel name -> canonical id + // (async), so dispatch it on the local task queue. + let h = voice_signal_handle.clone(); + wasm_bindgen_futures::spawn_local(async move { + h.send_voice_signal(&ch_id, target, signal).await; }); - }, - move |peers: std::collections::HashSet| { - set_speaking.set(peers); - }, - )))); + } + }, + move |peer_id: &str, stream: Option| { + let pid = peer_id.to_string(); + set_remote_streams.update(move |map| { + if let Some(s) = stream { + map.insert(pid, send_wrapper::SendWrapper::new(s)); + } else { + map.remove(&pid); + } + }); + }, + move |peers: std::collections::HashSet| { + set_speaking.set(peers); + }, + ))); provide_context(voice_manager.clone()); @@ -773,7 +781,7 @@ pub fn App() -> impl IntoView { let on_voice_mute = move |_: ()| { let new_muted = !app_state.voice.voice_muted.get_untracked(); write.voice.set_voice_muted.set(new_muted); - vm_mute.borrow().set_muted(new_muted); + vm_mute.set_muted(new_muted); }; // Voice deafen handler. @@ -783,10 +791,10 @@ pub fn App() -> impl IntoView { write.voice.set_voice_deafened.set(new_deafened); if new_deafened { write.voice.set_voice_muted.set(true); - vm_deafen.borrow().set_muted(true); + vm_deafen.set_muted(true); } else { write.voice.set_voice_muted.set(false); - vm_deafen.borrow().set_muted(false); + vm_deafen.set_muted(false); } }; @@ -798,7 +806,7 @@ pub fn App() -> impl IntoView { wasm_bindgen_futures::spawn_local(async move { handle_voice_leave.leave_voice().await; }); - vm_disconnect.borrow_mut().close_all(); + vm_disconnect.close_all(); write.voice.reset(); write.ui.set_show_call_page.set(false); write @@ -1053,7 +1061,7 @@ pub fn App() -> impl IntoView { wasm_bindgen_futures::spawn_local(async move { vc_leave.leave_voice().await; }); - vm.borrow_mut().close_all(); + vm.close_all(); write.voice.reset(); } @@ -1098,7 +1106,7 @@ pub fn App() -> impl IntoView { let on_success = wasm_bindgen::closure::Closure::once(move |stream: wasm_bindgen::JsValue| { use wasm_bindgen::JsCast; let stream: web_sys::MediaStream = stream.unchecked_into(); - vm2.borrow_mut().set_local_stream(stream); + vm2.set_local_stream(stream); wasm_bindgen_futures::spawn_local(async move { vc.join_voice(&ch_name).await; @@ -1540,32 +1548,21 @@ pub fn App() -> impl IntoView { /// Helper to create a WebRTC offer in a spawned future. /// -/// The `RefCell` borrow is held across await but this is safe on -/// single-threaded WASM where there is no preemption. -#[allow(clippy::await_holding_refcell_ref)] +/// `VoiceManager` exposes `&self` async methods over interior mutability, so no +/// `RefCell` borrow is held across the await — concurrent signaling tasks +/// (offers, answers, ICE candidates) can interleave safely. pub async fn handle_voice_create_offer(vm: VoiceManagerHandle, peer_id: String) { - let mut mgr = vm.borrow_mut(); - mgr.create_offer(&peer_id).await.ok(); + vm.create_offer(&peer_id).await.ok(); } /// Helper to handle an incoming WebRTC offer. -/// -/// The `RefCell` borrow is held across await but this is safe on -/// single-threaded WASM where there is no preemption. -#[allow(clippy::await_holding_refcell_ref)] pub async fn handle_voice_offer(vm: VoiceManagerHandle, from: String, sdp: String) { - let mut mgr = vm.borrow_mut(); - mgr.handle_offer(&from, &sdp).await.ok(); + vm.handle_offer(&from, &sdp).await.ok(); } /// Helper to handle an incoming WebRTC answer. -/// -/// The `RefCell` borrow is held across await but this is safe on -/// single-threaded WASM where there is no preemption. -#[allow(clippy::await_holding_refcell_ref)] pub async fn handle_voice_answer(vm: VoiceManagerHandle, from: String, sdp: String) { - let mgr = vm.borrow(); - mgr.handle_answer(&from, &sdp).await.ok(); + vm.handle_answer(&from, &sdp).await.ok(); } #[cfg(test)] diff --git a/crates/web/src/components/call_page.rs b/crates/web/src/components/call_page.rs index cef0c658..2f31ae9a 100644 --- a/crates/web/src/components/call_page.rs +++ b/crates/web/src/components/call_page.rs @@ -199,7 +199,7 @@ pub fn CallPage( if current_source == Some(VideoSource::Camera) { // Toggle off — stop camera. - vm_camera.borrow_mut().stop_video_share(); + vm_camera.stop_video_share(); write.voice.set_video_source.set(None); write.voice.set_local_video_stream.set(None); return; @@ -207,7 +207,7 @@ pub fn CallPage( // Stop any existing share first. if current_source.is_some() { - vm_camera.borrow_mut().stop_video_share(); + vm_camera.stop_video_share(); write.voice.set_video_source.set(None); write.voice.set_local_video_stream.set(None); } @@ -236,7 +236,7 @@ pub fn CallPage( use wasm_bindgen::JsCast; let stream: web_sys::MediaStream = stream.unchecked_into(); let stream_for_signal = SendWrapper::new(stream.clone()); - vm2.borrow_mut().start_camera(stream); + vm2.start_camera(stream); write2.voice.set_video_source.set(Some(VideoSource::Camera)); write2 .voice @@ -260,7 +260,7 @@ pub fn CallPage( if current_source == Some(VideoSource::Screen) { // Toggle off — stop screen share. - vm_screen.borrow_mut().stop_video_share(); + vm_screen.stop_video_share(); write.voice.set_video_source.set(None); write.voice.set_local_video_stream.set(None); return; @@ -268,7 +268,7 @@ pub fn CallPage( // Stop any existing share first. if current_source.is_some() { - vm_screen.borrow_mut().stop_video_share(); + vm_screen.stop_video_share(); write.voice.set_video_source.set(None); write.voice.set_local_video_stream.set(None); } @@ -295,7 +295,7 @@ pub fn CallPage( use wasm_bindgen::JsCast; let stream: web_sys::MediaStream = stream.unchecked_into(); let stream_for_signal = SendWrapper::new(stream.clone()); - vm2.borrow_mut().start_screen_share(stream.clone()); + vm2.start_screen_share(stream.clone()); write2.voice.set_video_source.set(Some(VideoSource::Screen)); write2 .voice @@ -309,7 +309,7 @@ pub fn CallPage( let track: web_sys::MediaStreamTrack = track_val.unchecked_into(); let vm_ended = vm2.clone(); let on_ended = Closure::once(move || { - vm_ended.borrow_mut().stop_video_share(); + vm_ended.stop_video_share(); write2.voice.set_local_video_stream.set(None); write2.voice.set_video_source.set(None); }); diff --git a/crates/web/src/event_processing.rs b/crates/web/src/event_processing.rs index 86f02348..56c9f40a 100644 --- a/crates/web/src/event_processing.rs +++ b/crates/web/src/event_processing.rs @@ -85,7 +85,10 @@ pub fn process_event_batch( participants.push(pid.clone()); } }); - if state.voice.voice_channel.get_untracked() == Some(ch) { + // Existing participants offer to the new joiner (one offerer per + // pair). Skip our own echoed join so we never offer to ourselves. + if state.voice.voice_channel.get_untracked() == Some(ch) && pid != handle.peer_id() + { let vm = voice_manager.clone(); let p = pid; wasm_bindgen_futures::spawn_local(handle_voice_create_offer(vm, p)); @@ -106,9 +109,7 @@ pub fn process_event_batch( write.voice.set_remote_video_streams.update(|m| { m.remove(&pid_for_stream); }); - voice_manager - .borrow_mut() - .close_connection(&peer_id.to_string()); + voice_manager.close_connection(&peer_id.to_string()); } ClientEvent::VoiceSignal { from_peer, signal, .. @@ -125,7 +126,7 @@ pub fn process_event_batch( wasm_bindgen_futures::spawn_local(handle_voice_answer(vm, from, s)); } VoiceSignalPayload::IceCandidate(json) => { - if let Err(e) = vm.borrow().handle_ice_candidate(&from, json) { + if let Err(e) = vm.handle_ice_candidate(&from, json) { tracing::warn!(?e, "handle_ice_candidate failed"); } } diff --git a/crates/web/src/voice.rs b/crates/web/src/voice.rs index 18157689..d5d19223 100644 --- a/crates/web/src/voice.rs +++ b/crates/web/src/voice.rs @@ -20,11 +20,11 @@ use std::collections::{HashMap, HashSet}; use std::rc::Rc; use wasm_bindgen::closure::Closure; -use wasm_bindgen::JsCast; +use wasm_bindgen::{JsCast, JsValue}; use web_sys::{ AnalyserNode, AudioContext, MediaStream, MediaStreamAudioSourceNode, RtcConfiguration, RtcIceServer, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcRtpSender, RtcSdpType, - RtcSessionDescriptionInit, RtcTrackEvent, + RtcSessionDescriptionInit, RtcSignalingState, RtcTrackEvent, }; use crate::state::VideoSource; @@ -200,6 +200,36 @@ struct PeerConnectionState { /// Shared flag: `true` while we are in the process of creating and sending /// an offer. Shared with the `onnegotiationneeded` closure via `Rc`. making_offer: Rc>, + /// `true` once `setRemoteDescription` has succeeded on this connection. + /// + /// Remote ICE candidates that arrive before this is set cannot be added — + /// the browser rejects `addIceCandidate` while `remoteDescription` is null + /// (it buffers only the *local* end's candidates, not remote ones). We + /// queue such candidates in [`VoiceManager::pending_ice`] and flush them + /// once this flips true. Reading `pc.remote_description()` directly would + /// require the `RtcSessionDescription` web-sys feature; a flag is both + /// cheaper and deterministically testable. + remote_set: Rc>, +} + +/// Cheaply-cloned handles to a peer connection, returned from +/// [`VoiceManager::get_or_create_connection`] so the caller can drive the +/// async SDP exchange without holding a `RefCell` borrow across `.await`. +struct ConnHandles { + pc: RtcPeerConnection, + making_offer: Rc>, + remote_set: Rc>, +} + +impl PeerConnectionState { + /// Clone the cheap handles (JS reference + `Rc` flags) out of this state. + fn handles(&self) -> ConnHandles { + ConnHandles { + pc: self.pc.clone(), + making_offer: self.making_offer.clone(), + remote_set: self.remote_set.clone(), + } + } } /// Manages WebRTC connections for voice chat. @@ -213,22 +243,32 @@ pub struct VoiceManager { /// Our own peer ID, used for polite/impolite determination. local_peer_id: String, /// One `PeerConnectionState` per remote peer. - connections: HashMap, + /// + /// All mutable state lives behind interior mutability (`RefCell`/`Cell`) so + /// every public method can take `&self`. This lets callers hold the manager + /// behind a plain `Rc` (no outer `RefCell`) and `.await` the + /// async signaling methods without holding any borrow across the await + /// point. Internal `RefCell` guards are always dropped before any `.await`. + connections: RefCell>, /// Local microphone stream (acquired once). - local_stream: Option, + local_stream: RefCell>, /// Callback to send signaling data: `(target_peer, signal_type, payload)`. on_signal: SignalCallback, /// Callback invoked when a remote video track arrives or ends. on_video_track: VideoTrackCallback, /// Active video stream (camera or screen share). - video_stream: Option, + video_stream: RefCell>, /// Which video source is currently active. - video_source: Option, + video_source: Cell>, /// RTP senders for the video track, keyed by remote peer ID. /// Stored so we can call `remove_track` later. - video_senders: HashMap, + video_senders: RefCell>, + /// Remote ICE candidates that arrived before their connection had a remote + /// description set. Flushed by [`VoiceManager::flush_pending_ice`] once + /// `setRemoteDescription` succeeds. Keyed by remote peer ID. + pending_ice: RefCell, /// Audio volume analyser for speaking detection. - speaking_detector: Option, + speaking_detector: RefCell>, /// Stored so `close_all()` can recreate the `SpeakingDetector` for the next session. speaking_change_cb: Option)>>, } @@ -264,14 +304,15 @@ impl VoiceManager { } Self { local_peer_id, - connections: HashMap::new(), - local_stream: None, + connections: RefCell::new(HashMap::new()), + local_stream: RefCell::new(None), on_signal: Rc::new(on_signal), on_video_track: Rc::new(on_video_track), - video_stream: None, - video_source: None, - video_senders: HashMap::new(), - speaking_detector: detector, + video_stream: RefCell::new(None), + video_source: Cell::new(None), + video_senders: RefCell::new(HashMap::new()), + pending_ice: RefCell::new(PendingIceCandidates::default()), + speaking_detector: RefCell::new(detector), speaking_change_cb: Some(speaking_cb), } } @@ -280,19 +321,20 @@ impl VoiceManager { /// /// Also adds the stream to the speaking detector so the local user's /// volume is analysed alongside remote peers. - pub fn set_local_stream(&mut self, stream: MediaStream) { - if let Some(ref mut detector) = self.speaking_detector { + pub fn set_local_stream(&self, stream: MediaStream) { + if let Some(ref mut detector) = *self.speaking_detector.borrow_mut() { detector.add_stream(&self.local_peer_id, &stream); } - self.local_stream = Some(stream); + *self.local_stream.borrow_mut() = Some(stream); } - /// Build an `RTCConfiguration` honouring the configured STUN URL list. + /// Build an `RTCConfiguration` honouring the configured ICE server list. /// - /// See [`resolve_stun_urls`] for the privacy-first default (empty list) - /// and the `window.__WILLOW_STUN_URLS` override knob. + /// See [`resolve_ice_servers`] for the privacy-first default (empty list) + /// and the `window.__WILLOW_ICE_SERVERS` / `window.__WILLOW_STUN_URLS` + /// override knobs. fn rtc_config() -> RtcConfiguration { - build_rtc_config(&resolve_stun_urls()) + build_rtc_config(&resolve_ice_servers()) } /// Test-only accessor for the resolved `RTCConfiguration`. @@ -313,7 +355,7 @@ impl VoiceManager { /// can store it in `video_senders` for later removal. fn add_local_tracks(&self, pc: &RtcPeerConnection) -> Option { // Audio tracks. - if let Some(ref stream) = self.local_stream { + if let Some(ref stream) = *self.local_stream.borrow() { let tracks = stream.get_audio_tracks(); for i in 0..tracks.length() { let track: web_sys::MediaStreamTrack = tracks.get(i).unchecked_into(); @@ -321,7 +363,7 @@ impl VoiceManager { } } // Video track if currently sharing. - if let Some(ref video_stream) = self.video_stream { + if let Some(ref video_stream) = *self.video_stream.borrow() { let tracks = video_stream.get_video_tracks(); if tracks.length() > 0 { let track: web_sys::MediaStreamTrack = tracks.get(0).unchecked_into(); @@ -362,12 +404,11 @@ impl VoiceManager { // Share the detector's analysers map, sources map, and audio context // with the closure so it can register incoming remote audio streams // for speaking detection. - let detector_analysers = self.speaking_detector.as_ref().map(|d| d.analysers.clone()); - let detector_sources = self.speaking_detector.as_ref().map(|d| d.sources.clone()); - let detector_ctx = self - .speaking_detector - .as_ref() - .map(|d| d.audio_context.clone()); + let detector_ref = self.speaking_detector.borrow(); + let detector_analysers = detector_ref.as_ref().map(|d| d.analysers.clone()); + let detector_sources = detector_ref.as_ref().map(|d| d.sources.clone()); + let detector_ctx = detector_ref.as_ref().map(|d| d.audio_context.clone()); + drop(detector_ref); let on_track = Closure::wrap(Box::new(move |ev: RtcTrackEvent| { let track: web_sys::MediaStreamTrack = ev.track(); @@ -507,24 +548,63 @@ impl VoiceManager { on_negotiation.forget(); } + /// Log ICE / connection state transitions for a peer connection. + /// + /// Without this, a failed ICE negotiation (the common cause of "joined but + /// no media") is completely silent. We read the state via `Reflect` so no + /// extra web-sys enum features are required, and warn on `failed` / + /// `disconnected`. This is diagnostics only — it changes no behaviour. + fn setup_connection_logging(pc: &RtcPeerConnection, remote_peer: &str) { + fn state_str(pc: &RtcPeerConnection, key: &str) -> String { + js_sys::Reflect::get(pc, &JsValue::from_str(key)) + .ok() + .and_then(|v| v.as_string()) + .unwrap_or_else(|| "unknown".to_string()) + } + + let pc_ice = pc.clone(); + let peer_ice = remote_peer.to_string(); + let on_ice_state = Closure::wrap(Box::new(move || { + let state = state_str(&pc_ice, "iceConnectionState"); + if state == "failed" || state == "disconnected" { + tracing::warn!(peer = %peer_ice, %state, "ICE connection state"); + } else { + tracing::debug!(peer = %peer_ice, %state, "ICE connection state"); + } + }) as Box); + pc.set_oniceconnectionstatechange(Some(on_ice_state.as_ref().unchecked_ref())); + on_ice_state.forget(); + + let pc_conn = pc.clone(); + let peer_conn = remote_peer.to_string(); + let on_conn_state = Closure::wrap(Box::new(move || { + let state = state_str(&pc_conn, "connectionState"); + if state == "failed" { + tracing::warn!(peer = %peer_conn, %state, "peer connection state"); + } else { + tracing::debug!(peer = %peer_conn, %state, "peer connection state"); + } + }) as Box); + pc.set_onconnectionstatechange(Some(on_conn_state.as_ref().unchecked_ref())); + on_conn_state.forget(); + } + /// Get an existing connection or create a new one with all handlers wired up. /// - /// If a connection already exists for `remote_peer`, it is returned as-is. - /// Otherwise a new `RTCPeerConnection` is created, local tracks are added, - /// and ICE / track / negotiation handlers are installed. + /// If a connection already exists for `remote_peer`, its handles are cloned + /// and returned. Otherwise a new `RTCPeerConnection` is created, local + /// tracks are added, and ICE / track / negotiation handlers are installed. /// - /// Returns a reference to the `PeerConnectionState` and an optional - /// `RtcRtpSender` for the video track (if one was added to a new connection). + /// Returns **owned, cloned** handles (the `RtcPeerConnection` is a cheap JS + /// reference; `making_offer`/`remote_set` are `Rc`s) plus an optional + /// `RtcRtpSender` for a video track added to a *newly created* connection. + /// No `RefCell` borrow escapes, so callers may freely `.await` afterwards. fn get_or_create_connection( - &mut self, + &self, remote_peer: &str, - ) -> Result<(&PeerConnectionState, Option), String> { - if self.connections.contains_key(remote_peer) { - let state = self - .connections - .get(remote_peer) - .expect("key just inserted"); - return Ok((state, None)); + ) -> Result<(ConnHandles, Option), String> { + if let Some(state) = self.connections.borrow().get(remote_peer) { + return Ok((state.handles(), None)); } let pc = RtcPeerConnection::new_with_configuration(&Self::rtc_config()) @@ -533,20 +613,21 @@ impl VoiceManager { let video_sender = self.add_local_tracks(&pc); self.setup_ice_handler(&pc, remote_peer); self.setup_track_handler(&pc, remote_peer); + Self::setup_connection_logging(&pc, remote_peer); let making_offer = Rc::new(Cell::new(false)); self.setup_negotiation_handler(&pc, remote_peer, making_offer.clone()); - self.connections.insert( - remote_peer.to_string(), - PeerConnectionState { pc, making_offer }, - ); - - let state = self - .connections - .get(remote_peer) - .expect("key just inserted"); - Ok((state, video_sender)) + let state = PeerConnectionState { + pc, + making_offer, + remote_set: Rc::new(Cell::new(false)), + }; + let handles = state.handles(); + self.connections + .borrow_mut() + .insert(remote_peer.to_string(), state); + Ok((handles, video_sender)) } /// Create an SDP offer and send it to a remote peer. @@ -554,10 +635,10 @@ impl VoiceManager { /// If a connection already exists it is reused; otherwise a new one is /// created with local tracks and all handlers. The offer is created on /// the (possibly existing) connection and sent via the signal callback. - pub async fn create_offer(&mut self, remote_peer: &str) -> Result<(), String> { - let (state, video_sender) = self.get_or_create_connection(remote_peer)?; - let pc = state.pc.clone(); - let making_offer = state.making_offer.clone(); + pub async fn create_offer(&self, remote_peer: &str) -> Result<(), String> { + let (handles, video_sender) = self.get_or_create_connection(remote_peer)?; + let pc = handles.pc; + let making_offer = handles.making_offer; // Prevent the onnegotiationneeded handler from firing a duplicate offer // while we are creating one here. @@ -565,7 +646,9 @@ impl VoiceManager { // Store video sender if a new connection was created with video. if let Some(sender) = video_sender { - self.video_senders.insert(remote_peer.to_string(), sender); + self.video_senders + .borrow_mut() + .insert(remote_peer.to_string(), sender); } // Create offer. @@ -602,31 +685,38 @@ impl VoiceManager { /// Handle an incoming SDP offer from a remote peer. /// - /// Implements the "perfect negotiation" pattern: - /// - If we are the **impolite** peer (higher ID) and are currently making - /// an offer, we ignore the incoming offer (our offer wins). - /// - If we are the **polite** peer (lower ID) and are making an offer, - /// we rollback our local description and accept the remote offer. - /// - Otherwise we accept the offer normally. - pub async fn handle_offer(&mut self, remote_peer: &str, sdp: &str) -> Result<(), String> { - let (state, video_sender) = self.get_or_create_connection(remote_peer)?; - let pc = state.pc.clone(); - let currently_making_offer = state.making_offer.get(); + /// Implements the canonical "perfect negotiation" pattern (see MDN + /// "Establishing a connection: The WebRTC perfect negotiation pattern"): + /// - An **offer collision** is `making_offer || signalingState != "stable"`. + /// - The **impolite** peer (higher ID) ignores a colliding offer — its own + /// offer wins. + /// - The **polite** peer (lower ID) rolls back its pending local offer and + /// accepts the remote one. + /// + /// Once `setRemoteDescription` succeeds, any ICE candidates that arrived + /// early (and were queued) are flushed. + pub async fn handle_offer(&self, remote_peer: &str, sdp: &str) -> Result<(), String> { + let (handles, video_sender) = self.get_or_create_connection(remote_peer)?; + let pc = handles.pc; // Store video sender if a new connection was created with video. if let Some(sender) = video_sender { - self.video_senders.insert(remote_peer.to_string(), sender); + self.video_senders + .borrow_mut() + .insert(remote_peer.to_string(), sender); } - // Perfect negotiation collision detection. + // Perfect-negotiation collision detection. `polite` = lower peer ID. let polite = self.local_peer_id.as_str() < remote_peer; + let stable = pc.signaling_state() == RtcSignalingState::Stable; + if should_ignore_offer(polite, handles.making_offer.get(), stable) { + // Impolite peer in a collision: keep our own offer, drop theirs. + return Ok(()); + } - if currently_making_offer { - if !polite { - // We are impolite and already making an offer — ignore incoming. - return Ok(()); - } - // We are polite — rollback our pending local description. + // Polite peer in a collision (or any non-stable state): roll back our + // pending local offer so we can accept the remote one. + if !stable { let rollback = RtcSessionDescriptionInit::new(RtcSdpType::Rollback); let _ = wasm_bindgen_futures::JsFuture::from(pc.set_local_description(&rollback)).await; } @@ -637,6 +727,9 @@ impl VoiceManager { wasm_bindgen_futures::JsFuture::from(pc.set_remote_description(&remote_desc)) .await .map_err(|_| "set_remote_description failed")?; + handles.remote_set.set(true); + // Remote description is in place — apply any candidates that arrived early. + self.flush_pending_ice(remote_peer, &pc); // Create answer. let answer = wasm_bindgen_futures::JsFuture::from(pc.create_answer()) @@ -661,44 +754,79 @@ impl VoiceManager { } /// Handle an incoming SDP answer from a remote peer. + /// + /// After `setRemoteDescription` succeeds, flushes any ICE candidates that + /// arrived before the answer. pub async fn handle_answer(&self, remote_peer: &str, sdp: &str) -> Result<(), String> { - let state = self - .connections - .get(remote_peer) - .ok_or("no connection for peer")?; + let handles = { + let conns = self.connections.borrow(); + conns + .get(remote_peer) + .map(|s| s.handles()) + .ok_or("no connection for peer")? + }; let desc = RtcSessionDescriptionInit::new(RtcSdpType::Answer); desc.set_sdp(sdp); - wasm_bindgen_futures::JsFuture::from(state.pc.set_remote_description(&desc)) + wasm_bindgen_futures::JsFuture::from(handles.pc.set_remote_description(&desc)) .await .map_err(|_| "set_remote_description failed")?; + handles.remote_set.set(true); + self.flush_pending_ice(remote_peer, &handles.pc); Ok(()) } /// Handle an incoming ICE candidate from a remote peer. + /// + /// `addIceCandidate` rejects when the connection has no remote description + /// yet (the browser buffers only *local* candidates, not remote ones), so a + /// candidate that arrives before its offer/answer must be queued rather than + /// dropped. Queued candidates are flushed by [`Self::flush_pending_ice`] + /// once `setRemoteDescription` succeeds. Candidates for a peer with no + /// connection yet are also queued (the connection is created when its offer + /// arrives). pub fn handle_ice_candidate( &self, remote_peer: &str, candidate_json: &str, ) -> Result<(), String> { - let state = self - .connections - .get(remote_peer) - .ok_or("no connection for peer")?; - - let candidate_obj = - js_sys::JSON::parse(candidate_json).map_err(|_| "invalid ICE candidate JSON")?; - // Use add_ice_candidate with the parsed JS object directly. - // The browser accepts RTCIceCandidateInit dictionaries natively. - let _ = state - .pc - .add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(candidate_obj.unchecked_ref())); + // Validate the JSON up front so malformed candidates are rejected + // rather than silently queued forever. + if js_sys::JSON::parse(candidate_json).is_err() { + return Err("invalid ICE candidate JSON".to_string()); + } + + let ready = { + let conns = self.connections.borrow(); + conns + .get(remote_peer) + .map(|state| (state.pc.clone(), state.remote_set.get())) + }; + + match ready { + Some((pc, true)) => apply_ice_candidate(&pc, candidate_json), + Some((_, false)) | None => { + // Connection missing or remote description not set yet — queue. + self.pending_ice + .borrow_mut() + .push(remote_peer, candidate_json); + } + } Ok(()) } + /// Apply every queued ICE candidate for `remote_peer` to `pc`, in arrival + /// order. Called once the connection's remote description is set. + fn flush_pending_ice(&self, remote_peer: &str, pc: &RtcPeerConnection) { + let queued = self.pending_ice.borrow_mut().take(remote_peer); + for json in queued { + apply_ice_candidate(pc, &json); + } + } + /// Mute or unmute the local microphone. pub fn set_muted(&self, muted: bool) { - if let Some(ref stream) = self.local_stream { + if let Some(ref stream) = *self.local_stream.borrow() { let tracks = stream.get_audio_tracks(); for i in 0..tracks.length() { let track: web_sys::MediaStreamTrack = tracks.get(i).unchecked_into(); @@ -712,21 +840,25 @@ impl VoiceManager { /// Stops any existing video share first. The video track is added to every /// existing peer connection; `onnegotiationneeded` fires automatically and /// handles the renegotiation. - pub fn start_video(&mut self, stream: MediaStream, source: VideoSource) { + pub fn start_video(&self, stream: MediaStream, source: VideoSource) { self.stop_video_share(); - self.video_stream = Some(stream.clone()); - self.video_source = Some(source); + *self.video_stream.borrow_mut() = Some(stream.clone()); + self.video_source.set(Some(source)); let video_tracks = stream.get_video_tracks(); if video_tracks.length() > 0 { let track: web_sys::MediaStreamTrack = video_tracks.get(0).unchecked_into(); - // Collect peer IDs first to avoid borrowing `self` in the loop. - let peer_ids: Vec = self.connections.keys().cloned().collect(); - for peer_id in peer_ids { - if let Some(state) = self.connections.get(&peer_id) { - let sender = state.pc.add_track_0(&track, &stream); - self.video_senders.insert(peer_id, sender); - } + // Snapshot (peer_id, pc) pairs so we don't hold the connections + // borrow while mutating video_senders. + let targets: Vec<(String, RtcRtpSender)> = self + .connections + .borrow() + .iter() + .map(|(peer_id, state)| (peer_id.clone(), state.pc.add_track_0(&track, &stream))) + .collect(); + let mut senders = self.video_senders.borrow_mut(); + for (peer_id, sender) in targets { + senders.insert(peer_id, sender); } } // onnegotiationneeded fires automatically from addTrack. @@ -736,50 +868,55 @@ impl VoiceManager { /// /// Stops the underlying `MediaStreamTrack` (turns off camera LED) and /// removes the RTP sender from each connection, triggering renegotiation. - pub fn stop_video_share(&mut self) { - let senders: Vec<(String, RtcRtpSender)> = self.video_senders.drain().collect(); - for (peer_id, sender) in senders { - if let Some(state) = self.connections.get(&peer_id) { - state.pc.remove_track(&sender); + pub fn stop_video_share(&self) { + let senders: Vec<(String, RtcRtpSender)> = + self.video_senders.borrow_mut().drain().collect(); + { + let conns = self.connections.borrow(); + for (peer_id, sender) in &senders { + if let Some(state) = conns.get(peer_id) { + state.pc.remove_track(sender); + } } } - if let Some(ref stream) = self.video_stream { + if let Some(ref stream) = *self.video_stream.borrow() { let tracks = stream.get_video_tracks(); for i in 0..tracks.length() { let track: web_sys::MediaStreamTrack = tracks.get(i).unchecked_into(); track.stop(); } } - self.video_stream = None; - self.video_source = None; + *self.video_stream.borrow_mut() = None; + self.video_source.set(None); } /// Start sharing the screen. Convenience wrapper around `start_video`. - pub fn start_screen_share(&mut self, stream: MediaStream) { + pub fn start_screen_share(&self, stream: MediaStream) { self.start_video(stream, VideoSource::Screen); } /// Start sharing the camera. Convenience wrapper around `start_video`. - pub fn start_camera(&mut self, stream: MediaStream) { + pub fn start_camera(&self, stream: MediaStream) { self.start_video(stream, VideoSource::Camera); } /// Return the currently active video source, if any. pub fn video_source(&self) -> Option { - self.video_source + self.video_source.get() } /// Close the connection to a specific remote peer. /// /// Also removes the `