Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions crates/flux-communication/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,27 @@ impl<T: Copy> Queue<T> {
fn group_cursor(&self, key: &str) -> *const AtomicUsize {
unsafe { &mut *self.inner.cast_mut() }.header.find_or_insert_group(key)
}

/// Advance the collaborative-group cursor for `label` to the producer's
/// current write count, allocating the group slot if it does not exist.
/// Never rewinds (uses `fetch_max`).
///
/// Use when joining a group whose backlog is meaningless — for instance,
/// when queued payloads index into an external arena that has since been
/// recycled, so replaying the ring would only surface stale references.
///
/// MUST be called before any consumer in the group has begun consuming.
/// Calling it after some members have already claimed slots can race
/// with concurrent `acquire_next_slot` calls and silently lose any
/// messages the producer wrote in the fast-forward window. Typical
/// pattern: invoke once from the spine owner, immediately before
/// attaching the group's tiles.
pub fn fast_forward_collaborative_group(&self, label: &str) {
let key = format!("{}[{}].{}.collab", binary_name(), current_pid(), label);
let cursor = self.group_cursor(&key);
let head = self.count();
unsafe { (*cursor).fetch_max(head, Ordering::Relaxed) };
}
}

unsafe impl<T> Send for Queue<T> {}
Expand Down
68 changes: 68 additions & 0 deletions crates/flux-communication/src/queue/tests_collaborative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,71 @@ fn perf_test_collaborative_consumers() {
assert_eq!(total_count, N, "received all {N} messages");
println!("latency ({total_count} msgs) — mean: {}ns", total_sum / total_count as u64);
}

#[test]
fn fast_forward_skips_backlog() {
const LABEL: &str = "ff_single";
let q: Queue<usize> = Queue::new(64, QueueType::MPMC);
let mut p = Producer::from(q);

for i in 0..30 {
p.produce(&i);
}

q.fast_forward_collaborative_group(LABEL);

let mut c = Consumer::new_collaborative_test(q, LABEL).without_log();

let mut received = Vec::new();
assert!(
!c.consume_collaborative(|x| received.push(*x)),
"post-fast-forward consumer must not see the backlog",
);
assert!(received.is_empty());

for i in 100..105 {
p.produce(&i);
}
while c.consume_collaborative(|x| received.push(*x)) {}
assert_eq!(received, vec![100, 101, 102, 103, 104]);
}

#[test]
fn fast_forward_no_gaps_with_multiple_consumers() {
const LABEL: &str = "ff_multi";
const CONSUMERS: usize = 4;
const POST: usize = 16;

let q: Queue<usize> = Queue::new(64, QueueType::MPMC);
let mut p = Producer::from(q);

for i in 0..30 {
p.produce(&i);
}

q.fast_forward_collaborative_group(LABEL);

let mut consumers: Vec<_> =
(0..CONSUMERS).map(|_| Consumer::new_collaborative_test(q, LABEL).without_log()).collect();

for i in 100..(100 + POST) {
p.produce(&i);
}

let mut received = Vec::new();
for _ in 0..(POST * 4) {
for c in &mut consumers {
c.consume_collaborative(|x| received.push(*x));
}
if received.len() >= POST {
break;
}
}

received.sort_unstable();
assert_eq!(
received,
(100..(100 + POST)).collect::<Vec<_>>(),
"every post-fast-forward message must be delivered exactly once across the group",
);
}
15 changes: 15 additions & 0 deletions crates/flux/src/spine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,19 @@ pub trait FluxSpine: Sized + Send {
let _ = std::fs::remove_dir_all(shmem_dir(Self::app_name()))
.inspect_err(|e| tracing::error!("couldn't remove spine queues {e}"));
}

/// Fast-forward the collaborative group `label` for queue `T` to the
/// producer's current write head, dropping any backlog already in the
/// ring.
///
/// Call once before attaching any tile in the group — see the
/// synchronisation note on
/// [`crate::communication::queue::Queue::fast_forward_collaborative_group`].
fn fast_forward_collaborative_group<T: 'static + Copy>(&self, label: &str)
where
Self: AsRef<SpineQueue<T>>,
{
let q: &SpineQueue<T> = self.as_ref();
q.fast_forward_collaborative_group(label);
}
}
Loading