diff --git a/crates/flux-communication/src/queue/mod.rs b/crates/flux-communication/src/queue/mod.rs index 21f3ef9..de0ecb0 100644 --- a/crates/flux-communication/src/queue/mod.rs +++ b/crates/flux-communication/src/queue/mod.rs @@ -575,6 +575,27 @@ impl Queue { fn group_cursor(&self, key: &str) -> *const AtomicUsize { unsafe { &mut *self.inner.cast_mut() }.header.find_or_insert_group(key) } + + /// Advance the collaborative-group cursor for `label` to the producer's + /// current write count, allocating the group slot if it does not exist. + /// Never rewinds (uses `fetch_max`). + /// + /// Use when joining a group whose backlog is meaningless — for instance, + /// when queued payloads index into an external arena that has since been + /// recycled, so replaying the ring would only surface stale references. + /// + /// MUST be called before any consumer in the group has begun consuming. + /// Calling it after some members have already claimed slots can race + /// with concurrent `acquire_next_slot` calls and silently lose any + /// messages the producer wrote in the fast-forward window. Typical + /// pattern: invoke once from the spine owner, immediately before + /// attaching the group's tiles. + pub fn fast_forward_collaborative_group(&self, label: &str) { + let key = format!("{}[{}].{}.collab", binary_name(), current_pid(), label); + let cursor = self.group_cursor(&key); + let head = self.count(); + unsafe { (*cursor).fetch_max(head, Ordering::Relaxed) }; + } } unsafe impl Send for Queue {} diff --git a/crates/flux-communication/src/queue/tests_collaborative.rs b/crates/flux-communication/src/queue/tests_collaborative.rs index 4b98d29..f969137 100644 --- a/crates/flux-communication/src/queue/tests_collaborative.rs +++ b/crates/flux-communication/src/queue/tests_collaborative.rs @@ -273,3 +273,71 @@ fn perf_test_collaborative_consumers() { assert_eq!(total_count, N, "received all {N} messages"); println!("latency ({total_count} msgs) — mean: {}ns", total_sum / total_count as u64); } + +#[test] +fn fast_forward_skips_backlog() { + const LABEL: &str = "ff_single"; + let q: Queue = Queue::new(64, QueueType::MPMC); + let mut p = Producer::from(q); + + for i in 0..30 { + p.produce(&i); + } + + q.fast_forward_collaborative_group(LABEL); + + let mut c = Consumer::new_collaborative_test(q, LABEL).without_log(); + + let mut received = Vec::new(); + assert!( + !c.consume_collaborative(|x| received.push(*x)), + "post-fast-forward consumer must not see the backlog", + ); + assert!(received.is_empty()); + + for i in 100..105 { + p.produce(&i); + } + while c.consume_collaborative(|x| received.push(*x)) {} + assert_eq!(received, vec![100, 101, 102, 103, 104]); +} + +#[test] +fn fast_forward_no_gaps_with_multiple_consumers() { + const LABEL: &str = "ff_multi"; + const CONSUMERS: usize = 4; + const POST: usize = 16; + + let q: Queue = Queue::new(64, QueueType::MPMC); + let mut p = Producer::from(q); + + for i in 0..30 { + p.produce(&i); + } + + q.fast_forward_collaborative_group(LABEL); + + let mut consumers: Vec<_> = + (0..CONSUMERS).map(|_| Consumer::new_collaborative_test(q, LABEL).without_log()).collect(); + + for i in 100..(100 + POST) { + p.produce(&i); + } + + let mut received = Vec::new(); + for _ in 0..(POST * 4) { + for c in &mut consumers { + c.consume_collaborative(|x| received.push(*x)); + } + if received.len() >= POST { + break; + } + } + + received.sort_unstable(); + assert_eq!( + received, + (100..(100 + POST)).collect::>(), + "every post-fast-forward message must be delivered exactly once across the group", + ); +} diff --git a/crates/flux/src/spine/mod.rs b/crates/flux/src/spine/mod.rs index 9d3620a..a47b1fe 100644 --- a/crates/flux/src/spine/mod.rs +++ b/crates/flux/src/spine/mod.rs @@ -191,4 +191,19 @@ pub trait FluxSpine: Sized + Send { let _ = std::fs::remove_dir_all(shmem_dir(Self::app_name())) .inspect_err(|e| tracing::error!("couldn't remove spine queues {e}")); } + + /// Fast-forward the collaborative group `label` for queue `T` to the + /// producer's current write head, dropping any backlog already in the + /// ring. + /// + /// Call once before attaching any tile in the group — see the + /// synchronisation note on + /// [`crate::communication::queue::Queue::fast_forward_collaborative_group`]. + fn fast_forward_collaborative_group(&self, label: &str) + where + Self: AsRef>, + { + let q: &SpineQueue = self.as_ref(); + q.fast_forward_collaborative_group(label); + } }