Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 47 additions & 23 deletions crates/client/src/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,19 +644,24 @@ pub(crate) async fn process_received_message<T: TopicHandle>(
// per channel) dimensions of `VoiceState.participants`.
// Without these gates, any signed peer can flood arbitrary
// `channel_id`s and exhaust receiving clients' memory.
let known = willow_actor::state::select(&ctx.event_state, {
//
// The wire carries the canonical `channel_id` (UUID); voice state
// and UI events are keyed by the channel *name* (the whole UI is
// name-keyed). Resolve id -> name here: `None` both fails the
// existence gate and means "unknown channel".
let name = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
move |es| es.channels.get(&ch).map(|c| c.name.clone())
})
.await;
if !known {
let Some(name) = name else {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceJoin: channel_id not in ServerState.channels"
);
return;
}
let ch = channel_id.clone();
};
let ch = name.clone();
let inserted = willow_actor::state::mutate(&ctx.voice, move |v| {
let new_channel = !v.participants.contains_key(&ch);
if new_channel && v.participants.len() >= MAX_VOICE_CHANNELS {
Expand All @@ -680,7 +685,7 @@ pub(crate) async fn process_received_message<T: TopicHandle>(
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceJoined {
channel_id,
channel_id: name,
peer_id,
})),
"event_broker.do_send Publish(VoiceJoined)",
Expand All @@ -695,19 +700,21 @@ pub(crate) async fn process_received_message<T: TopicHandle>(
// UI. The `participants` map is unaffected even without the
// gate (`get_mut` returns None), but the event-broker fanout
// would still leak attacker-controlled `channel_id`s.
let known = willow_actor::state::select(&ctx.event_state, {
// Resolve the wire `channel_id` (UUID) to the name used by voice
// state + UI events; `None` fails the existence gate.
let name = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
move |es| es.channels.get(&ch).map(|c| c.name.clone())
})
.await;
if !known {
let Some(name) = name else {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceLeave: channel_id not in ServerState.channels"
);
return;
}
let ch = channel_id.clone();
};
let ch = name.clone();
willow_actor::state::mutate(&ctx.voice, move |v| {
if let Some(p) = v.participants.get_mut(&ch) {
p.remove(&peer_id);
Expand All @@ -717,7 +724,7 @@ pub(crate) async fn process_received_message<T: TopicHandle>(
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceLeft {
channel_id,
channel_id: name,
peer_id,
})),
"event_broker.do_send Publish(VoiceLeft)",
Expand All @@ -734,22 +741,22 @@ pub(crate) async fn process_received_message<T: TopicHandle>(
// event handlers. Signals do not mutate `participants`
// directly, but the existence check shuts the same fanout
// gap as Join/Leave.
let known = willow_actor::state::select(&ctx.event_state, {
let name = willow_actor::state::select(&ctx.event_state, {
let ch = channel_id.clone();
move |es| es.channels.contains_key(&ch)
move |es| es.channels.get(&ch).map(|c| c.name.clone())
})
.await;
if !known {
let Some(name) = name else {
tracing::debug!(
%signer, channel_id = %channel_id,
"dropping VoiceSignal: channel_id not in ServerState.channels"
);
return;
}
};
warn_if_err(
ctx.event_broker
.do_send(willow_actor::Publish(ClientEvent::VoiceSignal {
channel_id,
channel_id: name,
from_peer: signer,
signal,
})),
Expand Down Expand Up @@ -1604,6 +1611,18 @@ mod tests {
.expect("test_client must seed at least one channel")
}

/// The display name of the first seeded channel. Voice state + events are
/// keyed by name (the wire carries the UUID `channel_id`, which the
/// listener resolves to this name), so state assertions look up by name.
async fn known_channel_name<N: willow_network::Network>(client: &ClientHandle<N>) -> String {
let snap = client.state_snapshot().await;
snap.channels
.values()
.next()
.map(|c| c.name.clone())
.expect("test_client must seed at least one channel")
}

/// VoiceJoin against an unknown `channel_id` must not mutate
/// `VoiceState.participants` (defends against attackers flooding
/// random channel ids until memory is exhausted).
Expand Down Expand Up @@ -1652,7 +1671,10 @@ mod tests {
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);
// Wire is addressed by canonical channel_id (UUID); voice state is
// keyed by the resolved channel name.
let channel_id = known_channel_id(&client).await;
let channel_name = known_channel_name(&client).await;

let attacker = willow_identity::Identity::generate();
let attacker_id = attacker.endpoint_id();
Expand All @@ -1665,7 +1687,7 @@ mod tests {

let stored = poll_until_some(
|| {
let ch = channel_id.clone();
let ch = channel_name.clone();
willow_actor::state::select(&client.voice_state_addr, move |v| {
v.participants
.get(&ch)
Expand All @@ -1679,7 +1701,7 @@ mod tests {
assert_eq!(
stored,
Some(1),
"VoiceJoin on a known channel must record the participant"
"VoiceJoin on a known channel must record the participant (keyed by name)"
);
}

Expand Down Expand Up @@ -1748,10 +1770,12 @@ mod tests {
.await
.expect("subscribe must succeed");
let ctx = make_ctx(&client);
// Wire is addressed by channel_id (UUID); state is keyed by name.
let channel_id = known_channel_id(&client).await;
let channel_name = known_channel_name(&client).await;

// Fill the cap with synthetic participants on the real channel.
let cap_channel = channel_id.clone();
// Fill the cap with synthetic participants on the real channel (by name).
let cap_channel = channel_name.clone();
willow_actor::state::mutate(&client.voice_state_addr, move |v| {
let set = v.participants.entry(cap_channel).or_default();
for _ in 0..MAX_PARTICIPANTS_PER_CHANNEL {
Expand All @@ -1763,7 +1787,7 @@ mod tests {
// Confirm preconditions.
let pre = voice_snapshot(&client).await;
assert_eq!(
pre.participants.get(&channel_id).map(|s| s.len()),
pre.participants.get(&channel_name).map(|s| s.len()),
Some(MAX_PARTICIPANTS_PER_CHANNEL),
"test setup must fill participants to cap"
);
Expand All @@ -1781,7 +1805,7 @@ mod tests {
let snap = voice_snapshot(&client).await;
let set = snap
.participants
.get(&channel_id)
.get(&channel_name)
.expect("channel set must still exist");
assert_eq!(
set.len(),
Expand Down
50 changes: 46 additions & 4 deletions crates/client/src/mutations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ impl<N: willow_network::Network> ClientMutations<N> {
channel_id.ok_or_else(|| anyhow::anyhow!("channel not found: {channel}"))
}

/// Resolve a channel reference (already a `channel_id`, or a display name)
/// to the canonical `channel_id` (UUID) used on the wire.
///
/// Voice signaling (`VoiceJoin`/`VoiceLeave`/`VoiceSignal`) must address
/// channels by their canonical id so the receiver's existence gate — which
/// validates against `ServerState.channels` (keyed by id) — accepts them.
/// The UI keys voice state by name, so this accepts either form: an exact
/// id match wins, otherwise it falls back to a name lookup. Returns `None`
/// if the channel is unknown.
pub(crate) async fn channel_id_for_voice(&self, channel: &str) -> Option<String> {
let ch = channel.to_string();
willow_actor::state::select(&self.event_state, move |es| {
if es.channels.contains_key(&ch) {
Some(ch.clone())
} else {
es.channels
.iter()
.find(|(_, c)| c.name == ch)
.map(|(id, _)| id.clone())
}
})
.await
}

/// Sync event_state mirror from ManagedDag, persist, and emit
/// ClientEvents. Called after build_event succeeds — ManagedDag has
/// already applied the event to its internal state atomically.
Expand Down Expand Up @@ -699,21 +723,33 @@ impl<N: willow_network::Network> ClientMutations<N> {

impl<N: willow_network::Network> ClientMutations<N> {
/// Join a voice channel.
pub async fn join_voice(&self, channel_id: &str) {
///
/// `channel` is a channel name (the UI's identifier) or a `channel_id`.
/// Local voice state stays keyed by the channel *name* (consistent with the
/// name-keyed UI); the broadcast `VoiceJoin` carries the canonical
/// `channel_id` (UUID) so remote peers' existence gate accepts it.
pub async fn join_voice(&self, channel: &str) {
let in_voice =
willow_actor::state::select(&self.voice, |v| v.active_channel.is_some()).await;
if in_voice {
self.leave_voice().await;
}
let ch = channel_id.to_string();
// Local voice state is keyed by the caller's reference (the UI's
// channel name), so the UI — which reads the same key — matches.
let my_peer_id = self.identity.endpoint_id();
let ch = channel.to_string();
willow_actor::state::mutate(&self.voice, move |v| {
v.active_channel = Some(ch.clone());
v.participants.entry(ch).or_default().insert(my_peer_id);
})
.await;
// Broadcast addressed by canonical channel_id so peers accept it.
let Some(channel_id) = self.channel_id_for_voice(channel).await else {
tracing::warn!(%channel, "join_voice: unknown channel, not broadcasting");
return;
};
let msg = ops::WireMessage::VoiceJoin {
channel_id: channel_id.to_string(),
channel_id,
peer_id: my_peer_id,
};
if let Some(data) = ops::pack_wire(&msg, &self.identity) {
Expand All @@ -736,8 +772,14 @@ impl<N: willow_network::Network> ClientMutations<N> {
})
.await;
if let Some(ch) = maybe_ch {
// `ch` is the local key (channel name); address the wire message
// by canonical channel_id so peers' existence gate accepts it.
let Some(channel_id) = self.channel_id_for_voice(&ch).await else {
tracing::warn!(channel = %ch, "leave_voice: unknown channel, not broadcasting");
return;
};
let msg = ops::WireMessage::VoiceLeave {
channel_id: ch,
channel_id,
peer_id: my_peer_id,
};
if let Some(data) = ops::pack_wire(&msg, &self.identity) {
Expand Down
15 changes: 12 additions & 3 deletions crates/client/src/voice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ impl<N: willow_network::Network> ClientHandle<N> {
willow_actor::state::select(&self.voice_state_addr, |v| v.deafened).await
}

pub fn send_voice_signal(
/// Send a WebRTC signaling message to a peer.
///
/// `channel` is the UI's channel reference (name) or a `channel_id`; it is
/// resolved to the canonical `channel_id` (UUID) so the receiver's
/// existence gate accepts it. Async because resolution reads event state.
pub async fn send_voice_signal(
&self,
channel_id: &str,
channel: &str,
target: willow_identity::EndpointId,
signal: ops::VoiceSignalPayload,
) {
let Some(channel_id) = self.mutation_handle.channel_id_for_voice(channel).await else {
tracing::warn!(%channel, "send_voice_signal: unknown channel");
return;
};
let msg = ops::WireMessage::VoiceSignal {
channel_id: channel_id.to_string(),
channel_id,
target_peer: target,
signal,
};
Expand Down
Loading