diff --git a/Cargo.toml b/Cargo.toml index db4dd2f..c04e4f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,3 +43,7 @@ harness = false [[bench]] name = "flurry_hashbrown" harness = false + +[[bench]] +name = "flurry_counter" +harness = false diff --git a/benches/flurry_counter.rs b/benches/flurry_counter.rs new file mode 100644 index 0000000..fab4ec7 --- /dev/null +++ b/benches/flurry_counter.rs @@ -0,0 +1,124 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use rayon; +use rayon::prelude::*; +use std::sync::atomic::{AtomicIsize, Ordering}; + +const ITER: isize = 32 * 1024; + +#[derive(Debug)] +struct ConcurrentCounter { + base: AtomicIsize, + cells: Vec, +} + +impl ConcurrentCounter { + fn new() -> Self { + Self { + base: AtomicIsize::new(0), + cells: (0..num_cpus::get()) + .into_iter() + .map(|_| AtomicIsize::new(0)) + .collect(), + } + } + + fn add(&self, value: isize) { + let mut base = self.base.load(Ordering::SeqCst); + let mut index = base + value; + + loop { + match self.base.compare_exchange( + base, + base + value, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(b) => base = b, + } + + let c = &self.cells[index as usize % self.cells.len()]; + let cv = c.load(Ordering::SeqCst); + index += cv; + + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + fn sum(&self, ordering: Ordering) -> isize { + let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum(); + + self.base.load(ordering) + sum + } +} + +fn atomic_counter(c: &mut Criterion) { + let mut group = c.benchmark_group("atomic_counter"); + group.throughput(Throughput::Elements(ITER as u64)); + let max = num_cpus::get(); + + for threads in 1..=max { + group.bench_with_input( + BenchmarkId::from_parameter(threads), + &threads, + |b, &threads| { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .unwrap(); + pool.install(|| { + b.iter(|| { + let counter = AtomicIsize::new(0); + (0..ITER).into_par_iter().for_each(|_| { + counter.fetch_add(1, Ordering::Relaxed); + }); + assert_eq!(ITER, counter.load(Ordering::Relaxed)); + }) + }); + }, + ); + } + + group.finish(); +} + +fn concurrent_counter(c: &mut Criterion) { + let mut group = c.benchmark_group("concurrent_counter"); + group.throughput(Throughput::Elements(ITER as u64)); + let max = num_cpus::get(); + + for threads in 1..=max { + group.bench_with_input( + BenchmarkId::from_parameter(threads), + &threads, + |b, &threads| { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .unwrap(); + pool.install(|| { + b.iter(|| { + let counter = ConcurrentCounter::new(); + (0..ITER).into_par_iter().for_each(|_| { + counter.add(1); + }); + assert_eq!(ITER, counter.sum(Ordering::Relaxed)); + }) + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + atomic_counter, + concurrent_counter +); +criterion_main!(benches); diff --git a/src/counter.rs b/src/counter.rs new file mode 100644 index 0000000..7950bec --- /dev/null +++ b/src/counter.rs @@ -0,0 +1,53 @@ +use std::sync::atomic::{AtomicIsize, Ordering}; + +// TODO: finish Java CounterCell port, this is only a bare minimum implementation. +#[derive(Debug)] +pub(crate) struct ConcurrentCounter { + base: AtomicIsize, + cells: Vec, +} + +impl ConcurrentCounter { + pub(crate) fn new() -> Self { + Self { + base: AtomicIsize::new(0), + cells: (0..crate::map::num_cpus()) + .into_iter() + .map(|_| AtomicIsize::new(0)) + .collect(), + } + } + + pub(crate) fn add(&self, value: isize) { + let mut base = self.base.load(Ordering::SeqCst); + let mut index = base + value; + + loop { + match self.base.compare_exchange( + base, + base + value, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(b) => base = b, + } + + let c = &self.cells[index as usize % self.cells.len()]; + let cv = c.load(Ordering::SeqCst); + index += cv; + + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + pub(crate) fn sum(&self, ordering: Ordering) -> isize { + let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum(); + + self.base.load(ordering) + sum + } +} diff --git a/src/lib.rs b/src/lib.rs index 17b20bc..e9cd4fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -241,6 +241,7 @@ #![warn(rust_2018_idioms)] #![allow(clippy::cognitive_complexity)] +mod counter; mod map; mod map_ref; mod node; diff --git a/src/map.rs b/src/map.rs index 0fb1ffc..6e7b810 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,9 +1,9 @@ -use seize::Linked; - +use crate::counter::ConcurrentCounter; use crate::iter::*; use crate::node::*; use crate::raw::*; use crate::reclaim::{Atomic, Collector, Guard, RetireShared, Shared}; +use seize::Linked; use std::borrow::Borrow; use std::error::Error; use std::fmt::{self, Debug, Display, Formatter}; @@ -93,7 +93,7 @@ pub struct HashMap { /// The next table index (plus one) to split while resizing. transfer_index: AtomicIsize, - count: AtomicIsize, + counter: ConcurrentCounter, /// Table initialization and resizing control. When negative, the /// table is being initialized or resized: -1 for initialization, @@ -280,8 +280,8 @@ impl HashMap { table: Atomic::null(), next_table: Atomic::null(), transfer_index: AtomicIsize::new(0), - count: AtomicIsize::new(0), size_ctl: AtomicIsize::new(0), + counter: ConcurrentCounter::new(), build_hasher: hash_builder, collector: Collector::new(), } @@ -349,6 +349,12 @@ impl HashMap { /// Returns the number of entries in the map. /// + /// Note that the returned value is _NOT_ an + /// atomic snapshot; invocation in the absence of concurrent + /// updates returns an accurate result, but concurrent updates that + /// occur while the sum is being calculated might not be + /// incorporated. + /// /// # Examples /// /// ``` @@ -361,7 +367,7 @@ impl HashMap { /// assert!(map.pin().len() == 2); /// ``` pub fn len(&self) -> usize { - let n = self.count.load(Ordering::Relaxed); + let n = self.counter.sum(Ordering::Relaxed); if n < 0 { 0 } else { @@ -1141,14 +1147,7 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard<'_>) { - // TODO: implement the Java CounterCell business here - - use std::cmp; - let mut count = match n.cmp(&0) { - cmp::Ordering::Greater => self.count.fetch_add(n, Ordering::SeqCst) + n, - cmp::Ordering::Less => self.count.fetch_sub(n.abs(), Ordering::SeqCst) - n, - cmp::Ordering::Equal => self.count.load(Ordering::SeqCst), - }; + self.counter.add(n); // if resize_hint is None, it means the caller does not want us to consider a resize. // if it is Some(n), the caller saw n entries in a bin @@ -1156,6 +1155,8 @@ where return; } + let mut count = self.counter.sum(Ordering::Relaxed); + // TODO: use the resize hint let _saw_bin_length = resize_hint.unwrap(); @@ -1217,7 +1218,7 @@ where } // another resize may be needed! - count = self.count.load(Ordering::SeqCst); + count = self.counter.sum(Ordering::Relaxed); } } @@ -3088,14 +3089,14 @@ where #[cfg(not(miri))] #[inline] /// Returns the number of physical CPUs in the machine (_O(1)_). -fn num_cpus() -> usize { +pub(crate) fn num_cpus() -> usize { NCPU_INITIALIZER.call_once(|| NCPU.store(num_cpus::get_physical(), Ordering::Relaxed)); NCPU.load(Ordering::Relaxed) } #[cfg(miri)] #[inline] -const fn num_cpus() -> usize { +pub(crate) const fn num_cpus() -> usize { 1 } diff --git a/src/node.rs b/src/node.rs index 36ed822..ff9a957 100644 --- a/src/node.rs +++ b/src/node.rs @@ -938,7 +938,7 @@ impl TreeBin { guard: &'g Guard<'_>, ) { guard.retire(bin.as_ptr(), |mut link| { - let bin = unsafe { + let bin = { // SAFETY: `bin` is a `BinEntry` let ptr = link.cast::>(); // SAFETY: `retire` guarantees that we