Skip to content

Commit f2a9e20

Browse files
committed
queue: add drain for SegQueue
1 parent 03919fe commit f2a9e20

2 files changed

Lines changed: 576 additions & 1 deletion

File tree

crossbeam-queue/src/seg_queue.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use core::{
44
cell::UnsafeCell,
55
fmt,
66
marker::PhantomData,
7-
mem::MaybeUninit,
7+
mem::{self, MaybeUninit},
8+
ops::{Bound, RangeBounds},
89
panic::{RefUnwindSafe, UnwindSafe},
910
ptr,
1011
sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering},
@@ -525,6 +526,75 @@ impl<T> SegQueue<T> {
525526
}
526527
}
527528

529+
/// Returns an iterator that drains elements from the queue within the given range.
530+
///
531+
/// Elements before the range and after the range remain in the queue.
532+
/// If the `Drain` iterator is dropped before being fully consumed,
533+
/// the remaining elements within the range are dropped, and elements
534+
/// outside the range are preserved in their original order.
535+
///
536+
/// # Panics
537+
///
538+
/// Panics if the start of the range is greater than the end.
539+
///
540+
/// # Examples
541+
///
542+
/// ```
543+
/// use crossbeam_queue::SegQueue;
544+
///
545+
/// // Full drain
546+
/// let mut q = SegQueue::new();
547+
/// for i in 0..5 { q.push(i); }
548+
/// let v: Vec<_> = q.drain(..).collect();
549+
/// assert_eq!(v, [0, 1, 2, 3, 4]);
550+
/// assert!(q.is_empty());
551+
///
552+
/// // Prefix drain
553+
/// let mut q = SegQueue::new();
554+
/// for i in 0..5 { q.push(i); }
555+
/// let v: Vec<_> = q.drain(..3).collect();
556+
/// assert_eq!(v, [0, 1, 2]);
557+
/// assert_eq!(q.len(), 2);
558+
///
559+
/// // Range drain
560+
/// let mut q = SegQueue::new();
561+
/// for i in 0..5 { q.push(i); }
562+
/// let v: Vec<_> = q.drain(1..4).collect();
563+
/// assert_eq!(v, [1, 2, 3]);
564+
/// assert_eq!(q.len(), 2);
565+
/// ```
566+
pub fn drain<R: RangeBounds<usize>>(&mut self, range: R) -> Drain<'_, T> {
567+
let start = match range.start_bound() {
568+
Bound::Included(&n) => n,
569+
Bound::Excluded(&n) => n.saturating_add(1),
570+
Bound::Unbounded => 0,
571+
};
572+
let end = match range.end_bound() {
573+
Bound::Included(&n) => n.saturating_add(1),
574+
Bound::Excluded(&n) => n,
575+
Bound::Unbounded => usize::MAX,
576+
};
577+
assert!(start <= end, "drain range start is greater than end");
578+
579+
// Move prefix elements into a saved SegQueue.
580+
// We use push_mut/pop_mut throughout since we have exclusive access.
581+
let mut prefix = SegQueue::new();
582+
for _ in 0..start {
583+
match self.pop_mut() {
584+
Some(v) => prefix.push_mut(v),
585+
None => break,
586+
}
587+
}
588+
589+
let remaining = end.saturating_sub(start);
590+
591+
Drain {
592+
queue: self,
593+
prefix,
594+
remaining,
595+
}
596+
}
597+
528598
/// Returns `true` if the queue is empty.
529599
///
530600
/// # Examples
@@ -596,6 +666,68 @@ impl<T> SegQueue<T> {
596666
}
597667
}
598668

669+
/// A draining iterator for `SegQueue<T>`.
670+
///
671+
/// This struct is created by the [`drain`] method on [`SegQueue`].
672+
///
673+
/// [`drain`]: SegQueue::drain
674+
pub struct Drain<'a, T> {
675+
queue: &'a mut SegQueue<T>,
676+
/// Saved prefix elements (before the drain range).
677+
prefix: SegQueue<T>,
678+
/// How many elements are left to yield/drop within the drain range.
679+
remaining: usize,
680+
}
681+
682+
impl<T> Iterator for Drain<'_, T> {
683+
type Item = T;
684+
685+
fn next(&mut self) -> Option<T> {
686+
if self.remaining == 0 {
687+
return None;
688+
}
689+
let val = self.queue.pop_mut();
690+
if val.is_some() {
691+
self.remaining -= 1;
692+
}
693+
val
694+
}
695+
696+
fn size_hint(&self) -> (usize, Option<usize>) {
697+
// Credit: size_hint and ExactSizeIterator suggested by @laycookie
698+
// in https://github.com/crossbeam-rs/crossbeam/issues/1228
699+
let remaining = self.remaining.min(self.queue.len());
700+
(remaining, Some(remaining))
701+
}
702+
}
703+
704+
impl<T> ExactSizeIterator for Drain<'_, T> {}
705+
706+
impl<T> Drop for Drain<'_, T> {
707+
fn drop(&mut self) {
708+
// Drop all remaining elements in the drain range.
709+
while self.remaining > 0 {
710+
if self.next().is_none() {
711+
break;
712+
}
713+
}
714+
// Move suffix (whatever remains in queue) into prefix,
715+
// preserving order: prefix elements come first, then suffix.
716+
while let Some(v) = self.queue.pop_mut() {
717+
self.prefix.push_mut(v);
718+
}
719+
// Swap so the original queue now holds prefix + suffix.
720+
mem::swap(self.queue, &mut self.prefix);
721+
// self.prefix (now the old queue, empty) drops here cleanly.
722+
}
723+
}
724+
725+
impl<T> fmt::Debug for Drain<'_, T> {
726+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
727+
f.pad("Drain { .. }")
728+
}
729+
}
730+
599731
impl<T> Drop for SegQueue<T> {
600732
fn drop(&mut self) {
601733
let mut head = *self.head.index.get_mut();

0 commit comments

Comments
 (0)