Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 36 additions & 2 deletions crates/paws_server/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ impl EventBroadcaster {

/// Ensures a broadcast channel exists for a task.
///
/// This is useful to create the channel before any events are broadcast,
/// so that subsequent subscribers can receive events from the buffer.
/// Creates the channel before events are broadcast to prevent events from
/// being dropped when no channel exists. Note: subscribers only receive
/// events sent *after* they subscribe. For past events, use `TaskStore`.
pub async fn ensure_channel(&self, task_id: TaskId) {
let mut channels = self.channels.write().await;
channels
Expand Down Expand Up @@ -208,3 +209,36 @@ mod tests {
// anymore.
}
}

#[cfg(test)]
mod broadcast_behavior_tests {
use tokio::sync::broadcast;

#[tokio::test]
async fn test_late_subscriber_does_not_receive_past_messages() {
// Create a broadcast channel with capacity 10
let (tx, mut rx1) = broadcast::channel::<String>(10);

// Send some messages
tx.send("Message 1".to_string()).unwrap();
tx.send("Message 2".to_string()).unwrap();
tx.send("Message 3".to_string()).unwrap();

// NOW create a second subscriber AFTER messages were sent
let mut rx2 = tx.subscribe();

// rx1 should receive all messages (subscribed before send)
assert_eq!(rx1.recv().await.unwrap(), "Message 1");
assert_eq!(rx1.recv().await.unwrap(), "Message 2");
assert_eq!(rx1.recv().await.unwrap(), "Message 3");

// Send a new message after rx2 subscribed
tx.send("Message 4".to_string()).unwrap();

// rx2 should receive Message 4 (sent AFTER subscription)
// but NOT Messages 1, 2, 3 (sent BEFORE subscription)
assert_eq!(rx2.recv().await.unwrap(), "Message 4");

// This proves late subscribers do NOT receive past messages
}
}