From 2c08ee0728534f441c9a3016530af863336bd1a0 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Tue, 23 Nov 2021 15:41:35 +0100 Subject: [PATCH 01/18] Shared counters optimization #11 --- Cargo.toml | 2 +- src/map.rs | 92 ++++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 84 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c09f35de..b563c492 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ sanitize = ['crossbeam-epoch/sanitize'] crossbeam-epoch = "0.8.2" parking_lot = "0.10" num_cpus = "1.12.0" +rand = { version = "0.7", features = ["small_rng"] } rayon = {version = "1.3", optional = true} serde = {version = "1.0.105", optional = true} @@ -34,7 +35,6 @@ version = "0.3.2" default-features = false [dev-dependencies] -rand = "0.7" rayon = "1.3" criterion = "0.3" serde_json = "1.0.50" diff --git a/src/map.rs b/src/map.rs index f1e1c1b0..78e3ed9e 100644 --- a/src/map.rs +++ b/src/map.rs @@ -2,12 +2,14 @@ use crate::iter::*; use crate::node::*; use crate::raw::*; use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; +use rand::seq::SliceRandom; +use rand::SeedableRng; use std::borrow::Borrow; use std::error::Error; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{BuildHasher, Hash, Hasher}; use std::iter::FromIterator; -use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; const ISIZE_BITS: usize = core::mem::size_of::() * 8; @@ -101,6 +103,10 @@ pub struct HashMap { /// next element count value upon which to resize the table. size_ctl: AtomicIsize, + counter_cells: Atomic>, + + cells_busy: AtomicBool, + /// Collector that all `Guard` references used for operations on this map must be tied to. It /// is important that they all assocate with the _same_ `Collector`, otherwise you end up with /// unsoundness as described in https://github.com/jonhoo/flurry/issues/46. Specifically, a @@ -299,6 +305,8 @@ impl HashMap { transfer_index: AtomicIsize::new(0), count: AtomicIsize::new(0), size_ctl: AtomicIsize::new(0), + counter_cells: Atomic::new(vec![AtomicIsize::new(0), AtomicIsize::new(0)]), + cells_busy: AtomicBool::default(), build_hasher: hash_builder, collector: epoch::default_collector().clone(), } @@ -1166,14 +1174,33 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard) { - // TODO: implement the Java CounterCell business here + let mut count = self.count.load(Ordering::SeqCst); + if self + .count + .compare_exchange(count, count + n, Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + // safety: this dereference is safe because counter_cells is never null. + let counter_cells = unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() }; + let cs = counter_cells + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = cs.load(Ordering::SeqCst); + if cs + .compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + self.full_add_count(n, guard); + return; + } - use std::cmp; - let mut count = match n.cmp(&0) { - cmp::Ordering::Greater => self.count.fetch_add(n, Ordering::SeqCst) + n, - cmp::Ordering::Less => self.count.fetch_sub(n.abs(), Ordering::SeqCst) - n, - cmp::Ordering::Equal => self.count.load(Ordering::SeqCst), - }; + if let Some(rh) = resize_hint { + if rh <= 1 { + return; + } + } + count = self.sum_count(guard); + } // if resize_hint is None, it means the caller does not want us to consider a resize. // if it is Some(n), the caller saw n entries in a bin @@ -1241,10 +1268,57 @@ where } // another resize may be needed! - count = self.count.load(Ordering::SeqCst); + count = self.sum_count(guard); + } + } + + fn full_add_count(&self, n: isize, guard: &Guard) { + loop { + // safety: this dereference is safe because counter_cells is never null. + let counter_cells = unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() }; + let cs = counter_cells + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = cs.load(Ordering::SeqCst); + if cs + .compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // we've picked a random counter cell and incremented its count. + break; + } + if self + .cells_busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // the selected counter cell had contention so we should increase the number of counter cells + unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref_mut() } + .resize_with((n << 1) as usize, AtomicIsize::default); + self.cells_busy.store(false, Ordering::SeqCst); + continue; + } + + // Fall back on using count + let c = self.count.load(Ordering::SeqCst); + if self + .count + .compare_exchange(c, c + n, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } } } + fn sum_count(&self, guard: &Guard) -> isize { + // safety: this dereference is safe because counter_cells is never null. + unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() } + .iter() + .map(|cs| cs.load(Ordering::SeqCst)) + .sum() + } + /// Tries to reserve capacity for at least `additional` more elements to be inserted in the /// `HashMap`. /// From d5aaaef3a567131a15264ce2f603d4dc58b9065d Mon Sep 17 00:00:00 2001 From: jimvdl Date: Tue, 23 Nov 2021 19:13:13 +0100 Subject: [PATCH 02/18] Missing safety warning --- src/map.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/map.rs b/src/map.rs index 78e3ed9e..10bd07fa 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1293,6 +1293,8 @@ where .is_ok() { // the selected counter cell had contention so we should increase the number of counter cells + // safety: only one thread can access this branch at a time so deref_mut should be safe. + // although i'm not sure, will update this safety message after someone confirms. unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref_mut() } .resize_with((n << 1) as usize, AtomicIsize::default); self.cells_busy.store(false, Ordering::SeqCst); From a4ef4938b4a705e2b3c74daacb1144ee959b158d Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 27 Nov 2021 15:01:06 +0100 Subject: [PATCH 03/18] Shared counters `HashMap` now has a `counter_cells` pointer that potentially holds counter cells to relieve `count` of contention. Under the guard of `cells_busy`, it resizes when one of the counter cells experiences contention. All of the counter cells are dropped when the hash map gets deallocated. --- src/map.rs | 185 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 132 insertions(+), 53 deletions(-) diff --git a/src/map.rs b/src/map.rs index 10bd07fa..daf2d2c1 100644 --- a/src/map.rs +++ b/src/map.rs @@ -305,7 +305,7 @@ impl HashMap { transfer_index: AtomicIsize::new(0), count: AtomicIsize::new(0), size_ctl: AtomicIsize::new(0), - counter_cells: Atomic::new(vec![AtomicIsize::new(0), AtomicIsize::new(0)]), + counter_cells: Atomic::null(), cells_busy: AtomicBool::default(), build_hasher: hash_builder, collector: epoch::default_collector().clone(), @@ -399,7 +399,7 @@ impl HashMap { /// assert!(map.pin().len() == 2); /// ``` pub fn len(&self) -> usize { - let n = self.count.load(Ordering::Relaxed); + let n = self.sum_count(); if n < 0 { 0 } else { @@ -407,6 +407,25 @@ impl HashMap { } } + fn sum_count(&self) -> isize { + let guard = &self.guard(); + let cs = self.counter_cells.load(Ordering::SeqCst, guard); + if cs.is_null() { + return self.count.load(Ordering::SeqCst); + } + + // safety: the counter cells pointer is valid because when we experience + // contention the counter cells are initialized and will not be deallocated + // until the hashmap is deallocated. counter cells are never used if contention + // doesn't atleast happen once. + let count: isize = unsafe { cs.deref() } + .iter() + .map(|cs| cs.load(Ordering::SeqCst)) + .sum(); + + self.count.load(Ordering::SeqCst) + count + } + /// Returns `true` if the map is empty. Otherwise returns `false`. /// /// # Examples @@ -1180,17 +1199,26 @@ where .compare_exchange(count, count + n, Ordering::SeqCst, Ordering::Relaxed) .is_err() { - // safety: this dereference is safe because counter_cells is never null. - let counter_cells = unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() }; - let cs = counter_cells + let cs = self.counter_cells.load(Ordering::SeqCst, guard); + if cs.is_null() { + self.full_add_count(n, true, guard); + return; + } + + // safety: the counter cells pointer is valid because when we experience + // contention the counter cells are initialized and will not be deallocated + // until the hashmap is deallocated. counter cells are never used if contention + // doesn't atleast happen once. + let cs = unsafe { cs.deref() }; + let c = cs .choose(&mut rand::rngs::SmallRng::from_entropy()) .unwrap(); - let cv = cs.load(Ordering::SeqCst); - if cs + let cv = c.load(Ordering::SeqCst); + let uncontended = c .compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { - self.full_add_count(n, guard); + .is_ok(); + if !uncontended { + self.full_add_count(n, uncontended, guard); return; } @@ -1199,7 +1227,7 @@ where return; } } - count = self.sum_count(guard); + count = self.sum_count(); } // if resize_hint is None, it means the caller does not want us to consider a resize. @@ -1268,59 +1296,105 @@ where } // another resize may be needed! - count = self.sum_count(guard); + count = self.count.load(Ordering::SeqCst); } } - fn full_add_count(&self, n: isize, guard: &Guard) { + fn full_add_count(&self, n: isize, mut uncontended: bool, guard: &Guard) { + let mut collide = false; loop { - // safety: this dereference is safe because counter_cells is never null. - let counter_cells = unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() }; - let cs = counter_cells - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = cs.load(Ordering::SeqCst); - if cs - .compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // we've picked a random counter cell and incremented its count. - break; - } - if self - .cells_busy - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // the selected counter cell had contention so we should increase the number of counter cells - // safety: only one thread can access this branch at a time so deref_mut should be safe. - // although i'm not sure, will update this safety message after someone confirms. - unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref_mut() } - .resize_with((n << 1) as usize, AtomicIsize::default); - self.cells_busy.store(false, Ordering::SeqCst); - continue; - } + let cs = self.counter_cells.load(Ordering::SeqCst, guard); + if !cs.is_null() { + if !uncontended { + uncontended = true; + continue; + } - // Fall back on using count - let c = self.count.load(Ordering::SeqCst); - if self - .count - .compare_exchange(c, c + n, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() + // safety: the counter cells pointer is valid because when we experience + // contention the counter cells are initialized and will not be deallocated + // until the hashmap is deallocated. counter cells are never used if contention + // doesn't atleast happen once. + let cs = unsafe { cs.deref() }; + let c = cs + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = c.load(Ordering::SeqCst); + if c.compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // we've picked a random counter cell and incremented its count. + break; + } + + if cs.len() >= num_cpus() { + collide = false; + // prevent counter cells from growing past cpu max. + // this ensures the loop keeps retrying with the max amount of counter cells + // this machine has to its disposal. eventually a counter cell would free up + // and increment its count instead of resizing counter cells indefinitely. + continue; + } + + if !collide { + collide = true; + continue; + } + + if self + .cells_busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // the selected counter cell had contention, increase the number of counter cells! + let new_len = cs.len() << 1; + + let mut new_cells = Vec::with_capacity(new_len); + + for cell in 0..new_len { + match cs.get(cell) { + Some(old_cell) => { + let value = old_cell.load(Ordering::SeqCst); + new_cells.push(AtomicIsize::new(value)); + } + None => new_cells.push(AtomicIsize::new(0)), + } + } + + let now_garbage = + self.counter_cells + .swap(Owned::new(new_cells), Ordering::SeqCst, guard); + drop(now_garbage); + + self.cells_busy.store(false, Ordering::SeqCst); + collide = false; + continue; + } + } else if cs.is_null() + && self + .cells_busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() { + self.counter_cells.store( + Owned::new(vec![AtomicIsize::new(n), AtomicIsize::new(0)]), + Ordering::SeqCst, + ); + self.cells_busy.store(false, Ordering::SeqCst); break; + } else { + // fall back on using count + let c = self.count.load(Ordering::SeqCst); + if self + .count + .compare_exchange(c, c + n, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } } } } - fn sum_count(&self, guard: &Guard) -> isize { - // safety: this dereference is safe because counter_cells is never null. - unsafe { self.counter_cells.load(Ordering::SeqCst, guard).deref() } - .iter() - .map(|cs| cs.load(Ordering::SeqCst)) - .sum() - } - /// Tries to reserve capacity for at least `additional` more elements to be inserted in the /// `HashMap`. /// @@ -3069,6 +3143,11 @@ impl Drop for HashMap { // safety: same as above + we own the table let mut table = unsafe { table.into_owned() }.into_box(); table.drop_bins(); + + let cs = self.counter_cells.load(Ordering::SeqCst, guard); + if !cs.is_null() { + drop(cs); + } } } From d982e4d2452bc67d3b126d6b300cc60ba9a41080 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 27 Nov 2021 16:38:40 +0100 Subject: [PATCH 04/18] Minor changes --- src/map.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/map.rs b/src/map.rs index daf2d2c1..4ae3c562 100644 --- a/src/map.rs +++ b/src/map.rs @@ -420,7 +420,7 @@ impl HashMap { // doesn't atleast happen once. let count: isize = unsafe { cs.deref() } .iter() - .map(|cs| cs.load(Ordering::SeqCst)) + .map(|c| c.load(Ordering::SeqCst)) .sum(); self.count.load(Ordering::SeqCst) + count @@ -1352,10 +1352,7 @@ where for cell in 0..new_len { match cs.get(cell) { - Some(old_cell) => { - let value = old_cell.load(Ordering::SeqCst); - new_cells.push(AtomicIsize::new(value)); - } + Some(old_cell) => new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))), None => new_cells.push(AtomicIsize::new(0)), } } From 72286fa2fd74286e69da82e2135e05bf3b91518d Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 27 Nov 2021 16:42:53 +0100 Subject: [PATCH 05/18] Fixed invalid drop of cells --- src/map.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/map.rs b/src/map.rs index 4ae3c562..3a7823db 100644 --- a/src/map.rs +++ b/src/map.rs @@ -3143,7 +3143,8 @@ impl Drop for HashMap { let cs = self.counter_cells.load(Ordering::SeqCst, guard); if !cs.is_null() { - drop(cs); + // safety: same as above + we own all counter cells + drop(unsafe { cs.into_owned() }); } } } From d4e85873a8cc8daf40ed98a8703a3e0b4ecb5f91 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 27 Nov 2021 17:05:09 +0100 Subject: [PATCH 06/18] Run cargo fmt --- src/map.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/map.rs b/src/map.rs index 3a7823db..3e705e2e 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1352,7 +1352,9 @@ where for cell in 0..new_len { match cs.get(cell) { - Some(old_cell) => new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))), + Some(old_cell) => { + new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))) + } None => new_cells.push(AtomicIsize::new(0)), } } From 2a4a28fb895862074e7e511dc5552f2ed31792df Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 27 Nov 2021 17:26:35 +0100 Subject: [PATCH 07/18] Fixed incorrect drop of old counter cells after resize --- src/map.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/map.rs b/src/map.rs index 3e705e2e..7775ab0f 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1362,7 +1362,20 @@ where let now_garbage = self.counter_cells .swap(Owned::new(new_cells), Ordering::SeqCst, guard); - drop(now_garbage); + // safety: need to guarantee that now_garbage is no longer + // reachable. more specifically, no thread that executes _after_ + // this line can ever get a reference to now_garbage. + // + // here are the possible cases: + // + // - another thread already has a reference to now_garbage. + // they must have read it before the call to swap. + // because of this, that thread must be pinned to an epoch <= + // the epoch of our guard. since the garbage is placed in our + // epoch, it won't be freed until the _next_ epoch, at which + // point, that thread must have dropped its guard, and with it, + // any reference to `counter_cells`. + unsafe { guard.defer_destroy(now_garbage) }; self.cells_busy.store(false, Ordering::SeqCst); collide = false; From 7b2e23f185d1f153a6b1d9727d444bbe7bfd2415 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 9 Dec 2021 14:59:12 +0100 Subject: [PATCH 08/18] Moved counter cell logic into its own module --- src/lib.rs | 1 + src/long_adder.rs | 188 ++++++++++++++++++++++++++++++++++++++++++++ src/map.rs | 195 ++++------------------------------------------ 3 files changed, 203 insertions(+), 181 deletions(-) create mode 100644 src/long_adder.rs diff --git a/src/lib.rs b/src/lib.rs index 91f0cce9..babd4ec3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -251,6 +251,7 @@ mod node; mod raw; mod set; mod set_ref; +mod long_adder; #[cfg(feature = "rayon")] mod rayon_impls; diff --git a/src/long_adder.rs b/src/long_adder.rs new file mode 100644 index 00000000..31a3825e --- /dev/null +++ b/src/long_adder.rs @@ -0,0 +1,188 @@ +use crossbeam_epoch::{Atomic, Guard, Owned, Shared}; +use rand::seq::SliceRandom; +use rand::SeedableRng; +use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; + +#[derive(Default)] +pub(crate) struct LongAdder { + base: AtomicIsize, + cells: Atomic>, + cells_busy: AtomicBool, +} + +impl LongAdder { + pub(crate) fn add(&self, value: isize, guard: &Guard) { + let cells = self.cells.load(Ordering::SeqCst, guard); + let base = self.base.load(Ordering::SeqCst); + if !cells.is_null() + || self + .base + .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + if cells.is_null() { + self.long_accumulate(value, true, guard); + return; + } + + let c = unsafe { cells.deref() } + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = c.load(Ordering::SeqCst); + let uncontended = c + .compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok(); + + if !uncontended { + self.long_accumulate(value, uncontended, guard); + } + } + } + + fn long_accumulate(&self, value: isize, mut uncontended: bool, guard: &Guard) { + let mut collide = false; + loop { + let cells = self.cells.load(Ordering::SeqCst, guard); + if !cells.is_null() { + if !uncontended { + uncontended = true; + continue; + } + + // safety: the counter cells pointer is valid because when we experience + // contention the counter cells are initialized and will not be deallocated + // until the hashmap is deallocated. counter cells are never used if contention + // doesn't atleast happen once. + let cells = unsafe { cells.deref() }; + let c = cells + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = c.load(Ordering::SeqCst); + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // we've picked a random counter cell and incremented its count. + break; + } + + if cells.len() >= crate::map::num_cpus() { + collide = false; + // prevent counter cells from growing past cpu max. + // this ensures the loop keeps retrying with the max amount of counter cells + // this machine has to its disposal. eventually a counter cell would free up + // and increment its count instead of resizing counter cells indefinitely. + continue; + } + + if !collide { + collide = true; + continue; + } + + if self + .cells_busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // the selected counter cell had contention, increase the number of counter cells! + let new_len = cells.len() << 1; + + let mut new_cells = Vec::with_capacity(new_len); + + for cell in 0..new_len { + match cells.get(cell) { + Some(old_cell) => { + new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))) + } + None => new_cells.push(AtomicIsize::new(0)), + } + } + + let now_garbage = + self.cells + .swap(Owned::new(new_cells), Ordering::SeqCst, guard); + // safety: need to guarantee that now_garbage is no longer + // reachable. more specifically, no thread that executes _after_ + // this line can ever get a reference to now_garbage. + // + // here are the possible cases: + // + // - another thread already has a reference to now_garbage. + // they must have read it before the call to swap. + // because of this, that thread must be pinned to an epoch <= + // the epoch of our guard. since the garbage is placed in our + // epoch, it won't be freed until the _next_ epoch, at which + // point, that thread must have dropped its guard, and with it, + // any reference to `counter_cells`. + unsafe { guard.defer_destroy(now_garbage) }; + + self.cells_busy.store(false, Ordering::SeqCst); + collide = false; + continue; + } + } else if cells.is_null() + && self + .cells_busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + self.cells.store( + Owned::new(vec![AtomicIsize::new(value), AtomicIsize::new(0)]), + Ordering::SeqCst, + ); + self.cells_busy.store(false, Ordering::SeqCst); + break; + } else { + // fall back on using count + let b = self.base.load(Ordering::SeqCst); + if self + .base + .compare_exchange(b, b + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + } + + pub(crate) fn sum(&self, guard: &Guard) -> isize { + let cells = self.cells.load(Ordering::SeqCst, guard); + if cells.is_null() { + return self.base.load(Ordering::SeqCst); + } + + // safety: the counter cells pointer is valid because when we experience + // contention the counter cells are initialized and will not be deallocated + // until the hashmap is deallocated. counter cells are never used if contention + // doesn't atleast happen once. + let count: isize = unsafe { cells.deref() } + .iter() + .map(|c| c.load(Ordering::SeqCst)) + .sum(); + + self.base.load(Ordering::SeqCst) + count + } +} + +impl Drop for LongAdder { + fn drop(&mut self) { + // safety: we have &mut self _and_ all references we have returned are bound to the + // lifetime of their borrow of self, so there cannot be any outstanding references to + // anything in the map. + // + // NOTE: we _could_ relax the bounds in all the methods that return `&'g ...` to not also + // bound `&self` by `'g`, but if we did that, we would need to use a regular `epoch::Guard` + // here rather than an unprotected one. + let guard = unsafe { crossbeam_epoch::unprotected() }; + + let cells = self.cells.swap(Shared::null(), Ordering::SeqCst, guard); + if cells.is_null() { + // table was never allocated! + return; + } + + // safety: same as above + we own all counter cells + drop(unsafe { cells.into_owned() }); + } +} \ No newline at end of file diff --git a/src/map.rs b/src/map.rs index 7775ab0f..3eca4968 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,15 +1,14 @@ use crate::iter::*; +use crate::long_adder::LongAdder; use crate::node::*; use crate::raw::*; use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; -use rand::seq::SliceRandom; -use rand::SeedableRng; use std::borrow::Borrow; use std::error::Error; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{BuildHasher, Hash, Hasher}; use std::iter::FromIterator; -use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; +use std::sync::atomic::{AtomicIsize, Ordering}; const ISIZE_BITS: usize = core::mem::size_of::() * 8; @@ -93,8 +92,6 @@ pub struct HashMap { /// The next table index (plus one) to split while resizing. transfer_index: AtomicIsize, - count: AtomicIsize, - /// Table initialization and resizing control. When negative, the /// table is being initialized or resized: -1 for initialization, /// else -(1 + the number of active resizing threads). Otherwise, @@ -103,9 +100,7 @@ pub struct HashMap { /// next element count value upon which to resize the table. size_ctl: AtomicIsize, - counter_cells: Atomic>, - - cells_busy: AtomicBool, + long_adder: LongAdder, /// Collector that all `Guard` references used for operations on this map must be tied to. It /// is important that they all assocate with the _same_ `Collector`, otherwise you end up with @@ -303,10 +298,8 @@ impl HashMap { table: Atomic::null(), next_table: Atomic::null(), transfer_index: AtomicIsize::new(0), - count: AtomicIsize::new(0), size_ctl: AtomicIsize::new(0), - counter_cells: Atomic::null(), - cells_busy: AtomicBool::default(), + long_adder: LongAdder::default(), build_hasher: hash_builder, collector: epoch::default_collector().clone(), } @@ -399,7 +392,7 @@ impl HashMap { /// assert!(map.pin().len() == 2); /// ``` pub fn len(&self) -> usize { - let n = self.sum_count(); + let n = self.long_adder.sum(&self.guard()); if n < 0 { 0 } else { @@ -407,25 +400,6 @@ impl HashMap { } } - fn sum_count(&self) -> isize { - let guard = &self.guard(); - let cs = self.counter_cells.load(Ordering::SeqCst, guard); - if cs.is_null() { - return self.count.load(Ordering::SeqCst); - } - - // safety: the counter cells pointer is valid because when we experience - // contention the counter cells are initialized and will not be deallocated - // until the hashmap is deallocated. counter cells are never used if contention - // doesn't atleast happen once. - let count: isize = unsafe { cs.deref() } - .iter() - .map(|c| c.load(Ordering::SeqCst)) - .sum(); - - self.count.load(Ordering::SeqCst) + count - } - /// Returns `true` if the map is empty. Otherwise returns `false`. /// /// # Examples @@ -1193,48 +1167,20 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard) { - let mut count = self.count.load(Ordering::SeqCst); - if self - .count - .compare_exchange(count, count + n, Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { - let cs = self.counter_cells.load(Ordering::SeqCst, guard); - if cs.is_null() { - self.full_add_count(n, true, guard); - return; - } - - // safety: the counter cells pointer is valid because when we experience - // contention the counter cells are initialized and will not be deallocated - // until the hashmap is deallocated. counter cells are never used if contention - // doesn't atleast happen once. - let cs = unsafe { cs.deref() }; - let c = cs - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = c.load(Ordering::SeqCst); - let uncontended = c - .compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) - .is_ok(); - if !uncontended { - self.full_add_count(n, uncontended, guard); - return; - } + self.long_adder.add(n, guard); - if let Some(rh) = resize_hint { + // if resize_hint is None, it means the caller does not want us to consider a resize. + // if it is Some(n), the caller saw n entries in a bin + match resize_hint { + Some(rh) => { if rh <= 1 { return; } } - count = self.sum_count(); + None => return, } - // if resize_hint is None, it means the caller does not want us to consider a resize. - // if it is Some(n), the caller saw n entries in a bin - if resize_hint.is_none() { - return; - } + let mut count = self.long_adder.sum(guard); // TODO: use the resize hint let _saw_bin_length = resize_hint.unwrap(); @@ -1296,114 +1242,7 @@ where } // another resize may be needed! - count = self.count.load(Ordering::SeqCst); - } - } - - fn full_add_count(&self, n: isize, mut uncontended: bool, guard: &Guard) { - let mut collide = false; - loop { - let cs = self.counter_cells.load(Ordering::SeqCst, guard); - if !cs.is_null() { - if !uncontended { - uncontended = true; - continue; - } - - // safety: the counter cells pointer is valid because when we experience - // contention the counter cells are initialized and will not be deallocated - // until the hashmap is deallocated. counter cells are never used if contention - // doesn't atleast happen once. - let cs = unsafe { cs.deref() }; - let c = cs - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = c.load(Ordering::SeqCst); - if c.compare_exchange(cv, cv + n, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // we've picked a random counter cell and incremented its count. - break; - } - - if cs.len() >= num_cpus() { - collide = false; - // prevent counter cells from growing past cpu max. - // this ensures the loop keeps retrying with the max amount of counter cells - // this machine has to its disposal. eventually a counter cell would free up - // and increment its count instead of resizing counter cells indefinitely. - continue; - } - - if !collide { - collide = true; - continue; - } - - if self - .cells_busy - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // the selected counter cell had contention, increase the number of counter cells! - let new_len = cs.len() << 1; - - let mut new_cells = Vec::with_capacity(new_len); - - for cell in 0..new_len { - match cs.get(cell) { - Some(old_cell) => { - new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))) - } - None => new_cells.push(AtomicIsize::new(0)), - } - } - - let now_garbage = - self.counter_cells - .swap(Owned::new(new_cells), Ordering::SeqCst, guard); - // safety: need to guarantee that now_garbage is no longer - // reachable. more specifically, no thread that executes _after_ - // this line can ever get a reference to now_garbage. - // - // here are the possible cases: - // - // - another thread already has a reference to now_garbage. - // they must have read it before the call to swap. - // because of this, that thread must be pinned to an epoch <= - // the epoch of our guard. since the garbage is placed in our - // epoch, it won't be freed until the _next_ epoch, at which - // point, that thread must have dropped its guard, and with it, - // any reference to `counter_cells`. - unsafe { guard.defer_destroy(now_garbage) }; - - self.cells_busy.store(false, Ordering::SeqCst); - collide = false; - continue; - } - } else if cs.is_null() - && self - .cells_busy - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - self.counter_cells.store( - Owned::new(vec![AtomicIsize::new(n), AtomicIsize::new(0)]), - Ordering::SeqCst, - ); - self.cells_busy.store(false, Ordering::SeqCst); - break; - } else { - // fall back on using count - let c = self.count.load(Ordering::SeqCst); - if self - .count - .compare_exchange(c, c + n, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - break; - } - } + count = self.long_adder.sum(guard); } } @@ -3155,12 +2994,6 @@ impl Drop for HashMap { // safety: same as above + we own the table let mut table = unsafe { table.into_owned() }.into_box(); table.drop_bins(); - - let cs = self.counter_cells.load(Ordering::SeqCst, guard); - if !cs.is_null() { - // safety: same as above + we own all counter cells - drop(unsafe { cs.into_owned() }); - } } } @@ -3269,7 +3102,7 @@ where #[cfg(not(miri))] #[inline] /// Returns the number of physical CPUs in the machine (_O(1)_). -fn num_cpus() -> usize { +pub(crate) fn num_cpus() -> usize { NCPU_INITIALIZER.call_once(|| NCPU.store(num_cpus::get_physical(), Ordering::Relaxed)); NCPU.load(Ordering::Relaxed) } From 4b3d0ed1de530cc3a9318291882e9e9c20256dbe Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 9 Dec 2021 15:14:17 +0100 Subject: [PATCH 09/18] Minor doc fixes --- src/long_adder.rs | 39 +++++++++++++++++---------------------- src/map.rs | 6 ++++++ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/long_adder.rs b/src/long_adder.rs index 31a3825e..c4a1abb1 100644 --- a/src/long_adder.rs +++ b/src/long_adder.rs @@ -49,9 +49,9 @@ impl LongAdder { continue; } - // safety: the counter cells pointer is valid because when we experience - // contention the counter cells are initialized and will not be deallocated - // until the hashmap is deallocated. counter cells are never used if contention + // safety: the cells pointer is valid because when we experience + // contention the cells are initialized and will not be deallocated + // until the hashmap is deallocated. cells are never used if contention // doesn't atleast happen once. let cells = unsafe { cells.deref() }; let c = cells @@ -61,16 +61,16 @@ impl LongAdder { if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) .is_ok() { - // we've picked a random counter cell and incremented its count. + // we've picked a random cell and incremented its count. break; } if cells.len() >= crate::map::num_cpus() { collide = false; - // prevent counter cells from growing past cpu max. - // this ensures the loop keeps retrying with the max amount of counter cells - // this machine has to its disposal. eventually a counter cell would free up - // and increment its count instead of resizing counter cells indefinitely. + // prevent cells from growing past cpu max. + // this ensures the loop keeps retrying with the max amount of cells + // this machine has to its disposal. eventually a cell would free up + // and increment its count instead of resizing cells indefinitely. continue; } @@ -84,7 +84,7 @@ impl LongAdder { .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) .is_ok() { - // the selected counter cell had contention, increase the number of counter cells! + // the selected cell had contention, increase the number of cells! let new_len = cells.len() << 1; let mut new_cells = Vec::with_capacity(new_len); @@ -113,7 +113,7 @@ impl LongAdder { // the epoch of our guard. since the garbage is placed in our // epoch, it won't be freed until the _next_ epoch, at which // point, that thread must have dropped its guard, and with it, - // any reference to `counter_cells`. + // any reference to `cells`. unsafe { guard.defer_destroy(now_garbage) }; self.cells_busy.store(false, Ordering::SeqCst); @@ -133,7 +133,7 @@ impl LongAdder { self.cells_busy.store(false, Ordering::SeqCst); break; } else { - // fall back on using count + // fall back on using base let b = self.base.load(Ordering::SeqCst); if self .base @@ -152,9 +152,9 @@ impl LongAdder { return self.base.load(Ordering::SeqCst); } - // safety: the counter cells pointer is valid because when we experience - // contention the counter cells are initialized and will not be deallocated - // until the hashmap is deallocated. counter cells are never used if contention + // safety: the cells pointer is valid because when we experience + // contention the cells are initialized and will not be deallocated + // until the hashmap is deallocated. cells are never used if contention // doesn't atleast happen once. let count: isize = unsafe { cells.deref() } .iter() @@ -168,21 +168,16 @@ impl LongAdder { impl Drop for LongAdder { fn drop(&mut self) { // safety: we have &mut self _and_ all references we have returned are bound to the - // lifetime of their borrow of self, so there cannot be any outstanding references to - // anything in the map. - // - // NOTE: we _could_ relax the bounds in all the methods that return `&'g ...` to not also - // bound `&self` by `'g`, but if we did that, we would need to use a regular `epoch::Guard` - // here rather than an unprotected one. + // lifetime of their borrow of self, so there cannot be any outstanding references. let guard = unsafe { crossbeam_epoch::unprotected() }; let cells = self.cells.swap(Shared::null(), Ordering::SeqCst, guard); if cells.is_null() { - // table was never allocated! + // cells were never allocated! return; } - // safety: same as above + we own all counter cells + // safety: same as above + we own the long adder. drop(unsafe { cells.into_owned() }); } } \ No newline at end of file diff --git a/src/map.rs b/src/map.rs index 3eca4968..a51bbd15 100644 --- a/src/map.rs +++ b/src/map.rs @@ -379,6 +379,12 @@ impl HashMap { } /// Returns the number of entries in the map. + /// + /// Note that the returned value is _NOT_ an + /// atomic snapshot; invocation in the absence of concurrent + /// updates returns an accurate result, but concurrent updates that + /// occur while the sum is being calculated might not be + /// incorporated. /// /// # Examples /// From d7de30075e72aa2fb48152892d98d91ec5bd8ade Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 9 Dec 2021 15:27:28 +0100 Subject: [PATCH 10/18] Ran cargo fmt --- src/lib.rs | 2 +- src/long_adder.rs | 2 +- src/map.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index babd4ec3..622b3553 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -245,13 +245,13 @@ use crossbeam_epoch::Guard; use std::ops::Deref; +mod long_adder; mod map; mod map_ref; mod node; mod raw; mod set; mod set_ref; -mod long_adder; #[cfg(feature = "rayon")] mod rayon_impls; diff --git a/src/long_adder.rs b/src/long_adder.rs index c4a1abb1..bde0ad52 100644 --- a/src/long_adder.rs +++ b/src/long_adder.rs @@ -180,4 +180,4 @@ impl Drop for LongAdder { // safety: same as above + we own the long adder. drop(unsafe { cells.into_owned() }); } -} \ No newline at end of file +} diff --git a/src/map.rs b/src/map.rs index a51bbd15..6f4afa99 100644 --- a/src/map.rs +++ b/src/map.rs @@ -379,7 +379,7 @@ impl HashMap { } /// Returns the number of entries in the map. - /// + /// /// Note that the returned value is _NOT_ an /// atomic snapshot; invocation in the absence of concurrent /// updates returns an accurate result, but concurrent updates that From 5882f5877f356408f61159ef3755600451fca102 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 9 Dec 2021 16:00:16 +0100 Subject: [PATCH 11/18] Impl Send + Sync for LongAdder --- src/long_adder.rs | 3 +++ src/map.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/long_adder.rs b/src/long_adder.rs index bde0ad52..fcf01c58 100644 --- a/src/long_adder.rs +++ b/src/long_adder.rs @@ -165,6 +165,9 @@ impl LongAdder { } } +unsafe impl Send for LongAdder {} +unsafe impl Sync for LongAdder {} + impl Drop for LongAdder { fn drop(&mut self) { // safety: we have &mut self _and_ all references we have returned are bound to the diff --git a/src/map.rs b/src/map.rs index 6f4afa99..669ae08b 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,5 +1,5 @@ use crate::iter::*; -use crate::long_adder::LongAdder; +use crate::long_adder::*; use crate::node::*; use crate::raw::*; use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; From 577874e803cf55833ce7efd6e637993320dbbd50 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 9 Dec 2021 22:22:36 +0100 Subject: [PATCH 12/18] Removed unnecessary Send and Sync for LongAdder --- src/long_adder.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/long_adder.rs b/src/long_adder.rs index fcf01c58..bde0ad52 100644 --- a/src/long_adder.rs +++ b/src/long_adder.rs @@ -165,9 +165,6 @@ impl LongAdder { } } -unsafe impl Send for LongAdder {} -unsafe impl Sync for LongAdder {} - impl Drop for LongAdder { fn drop(&mut self) { // safety: we have &mut self _and_ all references we have returned are bound to the From 52d1cfe02cb73d1b4e1c56a6e27120f041c661b4 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Fri, 17 Dec 2021 16:11:36 +0100 Subject: [PATCH 13/18] Refactored LongAdder to ConcurrentCounter --- src/counter.rs | 63 ++++++++++++++++ src/lib.rs | 2 +- src/long_adder.rs | 183 ---------------------------------------------- src/map.rs | 14 ++-- 4 files changed, 71 insertions(+), 191 deletions(-) create mode 100644 src/counter.rs delete mode 100644 src/long_adder.rs diff --git a/src/counter.rs b/src/counter.rs new file mode 100644 index 00000000..c80dae7f --- /dev/null +++ b/src/counter.rs @@ -0,0 +1,63 @@ +use rand::seq::SliceRandom; +use rand::SeedableRng; +use std::sync::atomic::{AtomicIsize, Ordering}; + +pub(crate) struct ConcurrentCounter { + base: AtomicIsize, + cells: Vec, +} + +impl ConcurrentCounter { + pub(crate) fn new() -> Self { + Self { + base: AtomicIsize::new(0), + cells: (0..crate::map::num_cpus()) + .into_iter() + .map(|_| AtomicIsize::new(0)) + .collect(), + } + } + + pub(crate) fn add(&self, value: isize) { + let base = self.base.load(Ordering::SeqCst); + if self + .base + .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) + .is_err() + { + // we experienced contention on base + loop { + let c = &self + .cells + .choose(&mut rand::rngs::SmallRng::from_entropy()) + .unwrap(); + let cv = c.load(Ordering::SeqCst); + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // we've successfully incremented a random counter + break; + } + + // the selected counter also experienced contention, retry base + let b = self.base.load(Ordering::SeqCst); + if self + .base + .compare_exchange(b, b + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // the contention on base has subsided, increment was successful + break; + } + + // both base and a random cell experienced contention, retry another cell + } + } + } + + pub(crate) fn sum(&self) -> isize { + let sum: isize = self.cells.iter().map(|c| c.load(Ordering::SeqCst)).sum(); + + self.base.load(Ordering::SeqCst) + sum + } +} diff --git a/src/lib.rs b/src/lib.rs index 622b3553..df70d25a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -245,7 +245,7 @@ use crossbeam_epoch::Guard; use std::ops::Deref; -mod long_adder; +mod counter; mod map; mod map_ref; mod node; diff --git a/src/long_adder.rs b/src/long_adder.rs deleted file mode 100644 index bde0ad52..00000000 --- a/src/long_adder.rs +++ /dev/null @@ -1,183 +0,0 @@ -use crossbeam_epoch::{Atomic, Guard, Owned, Shared}; -use rand::seq::SliceRandom; -use rand::SeedableRng; -use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; - -#[derive(Default)] -pub(crate) struct LongAdder { - base: AtomicIsize, - cells: Atomic>, - cells_busy: AtomicBool, -} - -impl LongAdder { - pub(crate) fn add(&self, value: isize, guard: &Guard) { - let cells = self.cells.load(Ordering::SeqCst, guard); - let base = self.base.load(Ordering::SeqCst); - if !cells.is_null() - || self - .base - .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { - if cells.is_null() { - self.long_accumulate(value, true, guard); - return; - } - - let c = unsafe { cells.deref() } - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = c.load(Ordering::SeqCst); - let uncontended = c - .compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) - .is_ok(); - - if !uncontended { - self.long_accumulate(value, uncontended, guard); - } - } - } - - fn long_accumulate(&self, value: isize, mut uncontended: bool, guard: &Guard) { - let mut collide = false; - loop { - let cells = self.cells.load(Ordering::SeqCst, guard); - if !cells.is_null() { - if !uncontended { - uncontended = true; - continue; - } - - // safety: the cells pointer is valid because when we experience - // contention the cells are initialized and will not be deallocated - // until the hashmap is deallocated. cells are never used if contention - // doesn't atleast happen once. - let cells = unsafe { cells.deref() }; - let c = cells - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = c.load(Ordering::SeqCst); - if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // we've picked a random cell and incremented its count. - break; - } - - if cells.len() >= crate::map::num_cpus() { - collide = false; - // prevent cells from growing past cpu max. - // this ensures the loop keeps retrying with the max amount of cells - // this machine has to its disposal. eventually a cell would free up - // and increment its count instead of resizing cells indefinitely. - continue; - } - - if !collide { - collide = true; - continue; - } - - if self - .cells_busy - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // the selected cell had contention, increase the number of cells! - let new_len = cells.len() << 1; - - let mut new_cells = Vec::with_capacity(new_len); - - for cell in 0..new_len { - match cells.get(cell) { - Some(old_cell) => { - new_cells.push(AtomicIsize::new(old_cell.load(Ordering::SeqCst))) - } - None => new_cells.push(AtomicIsize::new(0)), - } - } - - let now_garbage = - self.cells - .swap(Owned::new(new_cells), Ordering::SeqCst, guard); - // safety: need to guarantee that now_garbage is no longer - // reachable. more specifically, no thread that executes _after_ - // this line can ever get a reference to now_garbage. - // - // here are the possible cases: - // - // - another thread already has a reference to now_garbage. - // they must have read it before the call to swap. - // because of this, that thread must be pinned to an epoch <= - // the epoch of our guard. since the garbage is placed in our - // epoch, it won't be freed until the _next_ epoch, at which - // point, that thread must have dropped its guard, and with it, - // any reference to `cells`. - unsafe { guard.defer_destroy(now_garbage) }; - - self.cells_busy.store(false, Ordering::SeqCst); - collide = false; - continue; - } - } else if cells.is_null() - && self - .cells_busy - .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - self.cells.store( - Owned::new(vec![AtomicIsize::new(value), AtomicIsize::new(0)]), - Ordering::SeqCst, - ); - self.cells_busy.store(false, Ordering::SeqCst); - break; - } else { - // fall back on using base - let b = self.base.load(Ordering::SeqCst); - if self - .base - .compare_exchange(b, b + value, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - break; - } - } - } - } - - pub(crate) fn sum(&self, guard: &Guard) -> isize { - let cells = self.cells.load(Ordering::SeqCst, guard); - if cells.is_null() { - return self.base.load(Ordering::SeqCst); - } - - // safety: the cells pointer is valid because when we experience - // contention the cells are initialized and will not be deallocated - // until the hashmap is deallocated. cells are never used if contention - // doesn't atleast happen once. - let count: isize = unsafe { cells.deref() } - .iter() - .map(|c| c.load(Ordering::SeqCst)) - .sum(); - - self.base.load(Ordering::SeqCst) + count - } -} - -impl Drop for LongAdder { - fn drop(&mut self) { - // safety: we have &mut self _and_ all references we have returned are bound to the - // lifetime of their borrow of self, so there cannot be any outstanding references. - let guard = unsafe { crossbeam_epoch::unprotected() }; - - let cells = self.cells.swap(Shared::null(), Ordering::SeqCst, guard); - if cells.is_null() { - // cells were never allocated! - return; - } - - // safety: same as above + we own the long adder. - drop(unsafe { cells.into_owned() }); - } -} diff --git a/src/map.rs b/src/map.rs index 669ae08b..227a8b07 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,5 +1,5 @@ +use crate::counter::*; use crate::iter::*; -use crate::long_adder::*; use crate::node::*; use crate::raw::*; use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; @@ -100,7 +100,7 @@ pub struct HashMap { /// next element count value upon which to resize the table. size_ctl: AtomicIsize, - long_adder: LongAdder, + counter: ConcurrentCounter, /// Collector that all `Guard` references used for operations on this map must be tied to. It /// is important that they all assocate with the _same_ `Collector`, otherwise you end up with @@ -299,7 +299,7 @@ impl HashMap { next_table: Atomic::null(), transfer_index: AtomicIsize::new(0), size_ctl: AtomicIsize::new(0), - long_adder: LongAdder::default(), + counter: ConcurrentCounter::new(), build_hasher: hash_builder, collector: epoch::default_collector().clone(), } @@ -398,7 +398,7 @@ impl HashMap { /// assert!(map.pin().len() == 2); /// ``` pub fn len(&self) -> usize { - let n = self.long_adder.sum(&self.guard()); + let n = self.counter.sum(); if n < 0 { 0 } else { @@ -1173,7 +1173,7 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard) { - self.long_adder.add(n, guard); + self.counter.add(n); // if resize_hint is None, it means the caller does not want us to consider a resize. // if it is Some(n), the caller saw n entries in a bin @@ -1186,7 +1186,7 @@ where None => return, } - let mut count = self.long_adder.sum(guard); + let mut count = self.counter.sum(); // TODO: use the resize hint let _saw_bin_length = resize_hint.unwrap(); @@ -1248,7 +1248,7 @@ where } // another resize may be needed! - count = self.long_adder.sum(guard); + count = self.counter.sum(); } } From 166dca0f7f089dd1e16a90447b9c1b6e3abec680 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Tue, 8 Mar 2022 15:25:02 +0100 Subject: [PATCH 14/18] Applied review changes --- Cargo.toml | 1 - src/counter.rs | 43 +++++++++++-------------------------------- src/map.rs | 7 ++++--- 3 files changed, 15 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4f313c1d..db4dd2f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ maintenance = { status = "experimental" } [dependencies] parking_lot = "0.12" num_cpus = "1.12.0" -rand = { version = "0.8", features = ["small_rng"] } rayon = {version = "1.3", optional = true} serde = {version = "1.0.105", optional = true} seize = "0.2.1" diff --git a/src/counter.rs b/src/counter.rs index c80dae7f..940b34a2 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -1,5 +1,3 @@ -use rand::seq::SliceRandom; -use rand::SeedableRng; use std::sync::atomic::{AtomicIsize, Ordering}; pub(crate) struct ConcurrentCounter { @@ -20,44 +18,25 @@ impl ConcurrentCounter { pub(crate) fn add(&self, value: isize) { let base = self.base.load(Ordering::SeqCst); - if self + + while self .base .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) .is_err() { - // we experienced contention on base - loop { - let c = &self - .cells - .choose(&mut rand::rngs::SmallRng::from_entropy()) - .unwrap(); - let cv = c.load(Ordering::SeqCst); - if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // we've successfully incremented a random counter - break; - } - - // the selected counter also experienced contention, retry base - let b = self.base.load(Ordering::SeqCst); - if self - .base - .compare_exchange(b, b + value, Ordering::SeqCst, Ordering::Relaxed) - .is_ok() - { - // the contention on base has subsided, increment was successful - break; - } - - // both base and a random cell experienced contention, retry another cell + let c = &self.cells[base as usize % self.cells.len()]; + let cv = c.load(Ordering::SeqCst); + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; } } } - pub(crate) fn sum(&self) -> isize { - let sum: isize = self.cells.iter().map(|c| c.load(Ordering::SeqCst)).sum(); + pub(crate) fn sum(&self, ordering: Ordering) -> isize { + let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum(); - self.base.load(Ordering::SeqCst) + sum + self.base.load(ordering) + sum } } diff --git a/src/map.rs b/src/map.rs index be74cddb..e41aa1ff 100644 --- a/src/map.rs +++ b/src/map.rs @@ -367,7 +367,7 @@ impl HashMap { /// assert!(map.pin().len() == 2); /// ``` pub fn len(&self) -> usize { - let n = self.counter.sum(); + let n = self.counter.sum(Ordering::Relaxed); if n < 0 { 0 } else { @@ -1147,6 +1147,7 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard<'_>) { + // TODO: finish Java CounterCell port, this is only a bare minimum implementation. self.counter.add(n); // if resize_hint is None, it means the caller does not want us to consider a resize. @@ -1160,7 +1161,7 @@ where None => return, } - let mut count = self.counter.sum(); + let mut count = self.counter.sum(Ordering::Relaxed); // TODO: use the resize hint let _saw_bin_length = resize_hint.unwrap(); @@ -1223,7 +1224,7 @@ where } // another resize may be needed! - count = self.counter.sum(); + count = self.counter.sum(Ordering::Relaxed); } } From a7e3aaf5b82c062ed08b31e794d75e6475bbbd4f Mon Sep 17 00:00:00 2001 From: jimvdl Date: Thu, 10 Mar 2022 12:05:24 +0100 Subject: [PATCH 15/18] Applied some review changes - Made the resize hint check more concise. - Moved the TODO about the CounterCell implementation to the counter module. - Reverted the counter declaration back to the original line. --- src/counter.rs | 1 + src/map.rs | 16 +++++----------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/counter.rs b/src/counter.rs index 940b34a2..a8f97f6b 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -1,5 +1,6 @@ use std::sync::atomic::{AtomicIsize, Ordering}; +// TODO: finish Java CounterCell port, this is only a bare minimum implementation. pub(crate) struct ConcurrentCounter { base: AtomicIsize, cells: Vec, diff --git a/src/map.rs b/src/map.rs index e41aa1ff..53b184a4 100644 --- a/src/map.rs +++ b/src/map.rs @@ -93,6 +93,8 @@ pub struct HashMap { /// The next table index (plus one) to split while resizing. transfer_index: AtomicIsize, + counter: ConcurrentCounter, + /// Table initialization and resizing control. When negative, the /// table is being initialized or resized: -1 for initialization, /// else -(1 + the number of active resizing threads). Otherwise, @@ -101,8 +103,6 @@ pub struct HashMap { /// next element count value upon which to resize the table. size_ctl: AtomicIsize, - counter: ConcurrentCounter, - /// Collector that all `Guard` references used for operations on this map must be tied to. It /// is important that they all assocate with the _same_ `Collector`, otherwise you end up with /// unsoundness as described in https://github.com/jonhoo/flurry/issues/46. Specifically, a @@ -1147,18 +1147,12 @@ where } fn add_count(&self, n: isize, resize_hint: Option, guard: &Guard<'_>) { - // TODO: finish Java CounterCell port, this is only a bare minimum implementation. self.counter.add(n); // if resize_hint is None, it means the caller does not want us to consider a resize. // if it is Some(n), the caller saw n entries in a bin - match resize_hint { - Some(rh) => { - if rh <= 1 { - return; - } - } - None => return, + if resize_hint.unwrap_or(0) <= 1 { + return; } let mut count = self.counter.sum(Ordering::Relaxed); @@ -3102,7 +3096,7 @@ pub(crate) fn num_cpus() -> usize { #[cfg(miri)] #[inline] -const fn num_cpus() -> usize { +pub(crate) const fn num_cpus() -> usize { 1 } From 68eda452af928db39d5413c99e3b8e1ff723131d Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sat, 12 Mar 2022 14:16:08 +0100 Subject: [PATCH 16/18] Better cell selection, reverted resize hint change --- src/counter.rs | 5 ++++- src/map.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/counter.rs b/src/counter.rs index a8f97f6b..6785627a 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -19,14 +19,17 @@ impl ConcurrentCounter { pub(crate) fn add(&self, value: isize) { let base = self.base.load(Ordering::SeqCst); + let mut index = base + value; while self .base .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) .is_err() { - let c = &self.cells[base as usize % self.cells.len()]; + let c = &self.cells[index as usize % self.cells.len()]; let cv = c.load(Ordering::SeqCst); + index += cv; + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) .is_ok() { diff --git a/src/map.rs b/src/map.rs index f38d4334..6e7b8106 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1151,7 +1151,7 @@ where // if resize_hint is None, it means the caller does not want us to consider a resize. // if it is Some(n), the caller saw n entries in a bin - if resize_hint.unwrap_or(0) <= 1 { + if resize_hint.is_none() { return; } From e398d4f2f9e5e89d622ebfb292868b7c864538e0 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Sun, 13 Mar 2022 17:38:48 +0100 Subject: [PATCH 17/18] Fixed an issue where base wasn't retried correctly every iteration --- src/counter.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/counter.rs b/src/counter.rs index 6785627a..26797ade 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -18,14 +18,20 @@ impl ConcurrentCounter { } pub(crate) fn add(&self, value: isize) { - let base = self.base.load(Ordering::SeqCst); + let mut base = self.base.load(Ordering::SeqCst); let mut index = base + value; - while self - .base - .compare_exchange(base, base + value, Ordering::SeqCst, Ordering::Relaxed) - .is_err() - { + loop { + match self.base.compare_exchange( + base, + base + value, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(b) => base = b, + } + let c = &self.cells[index as usize % self.cells.len()]; let cv = c.load(Ordering::SeqCst); index += cv; From d8343f9627dccace6b3b5c82741676a1ff0c4ee3 Mon Sep 17 00:00:00 2001 From: jimvdl Date: Tue, 5 Apr 2022 13:04:14 +0200 Subject: [PATCH 18/18] Initial benchmarking approach --- Cargo.toml | 4 ++ benches/flurry_counter.rs | 124 ++++++++++++++++++++++++++++++++++++++ src/counter.rs | 1 + 3 files changed, 129 insertions(+) create mode 100644 benches/flurry_counter.rs diff --git a/Cargo.toml b/Cargo.toml index db4dd2f5..c04e4f5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,3 +43,7 @@ harness = false [[bench]] name = "flurry_hashbrown" harness = false + +[[bench]] +name = "flurry_counter" +harness = false diff --git a/benches/flurry_counter.rs b/benches/flurry_counter.rs new file mode 100644 index 00000000..fab4ec77 --- /dev/null +++ b/benches/flurry_counter.rs @@ -0,0 +1,124 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use rayon; +use rayon::prelude::*; +use std::sync::atomic::{AtomicIsize, Ordering}; + +const ITER: isize = 32 * 1024; + +#[derive(Debug)] +struct ConcurrentCounter { + base: AtomicIsize, + cells: Vec, +} + +impl ConcurrentCounter { + fn new() -> Self { + Self { + base: AtomicIsize::new(0), + cells: (0..num_cpus::get()) + .into_iter() + .map(|_| AtomicIsize::new(0)) + .collect(), + } + } + + fn add(&self, value: isize) { + let mut base = self.base.load(Ordering::SeqCst); + let mut index = base + value; + + loop { + match self.base.compare_exchange( + base, + base + value, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(b) => base = b, + } + + let c = &self.cells[index as usize % self.cells.len()]; + let cv = c.load(Ordering::SeqCst); + index += cv; + + if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + break; + } + } + } + + fn sum(&self, ordering: Ordering) -> isize { + let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum(); + + self.base.load(ordering) + sum + } +} + +fn atomic_counter(c: &mut Criterion) { + let mut group = c.benchmark_group("atomic_counter"); + group.throughput(Throughput::Elements(ITER as u64)); + let max = num_cpus::get(); + + for threads in 1..=max { + group.bench_with_input( + BenchmarkId::from_parameter(threads), + &threads, + |b, &threads| { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .unwrap(); + pool.install(|| { + b.iter(|| { + let counter = AtomicIsize::new(0); + (0..ITER).into_par_iter().for_each(|_| { + counter.fetch_add(1, Ordering::Relaxed); + }); + assert_eq!(ITER, counter.load(Ordering::Relaxed)); + }) + }); + }, + ); + } + + group.finish(); +} + +fn concurrent_counter(c: &mut Criterion) { + let mut group = c.benchmark_group("concurrent_counter"); + group.throughput(Throughput::Elements(ITER as u64)); + let max = num_cpus::get(); + + for threads in 1..=max { + group.bench_with_input( + BenchmarkId::from_parameter(threads), + &threads, + |b, &threads| { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .unwrap(); + pool.install(|| { + b.iter(|| { + let counter = ConcurrentCounter::new(); + (0..ITER).into_par_iter().for_each(|_| { + counter.add(1); + }); + assert_eq!(ITER, counter.sum(Ordering::Relaxed)); + }) + }); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + atomic_counter, + concurrent_counter +); +criterion_main!(benches); diff --git a/src/counter.rs b/src/counter.rs index 26797ade..7950bec0 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicIsize, Ordering}; // TODO: finish Java CounterCell port, this is only a bare minimum implementation. +#[derive(Debug)] pub(crate) struct ConcurrentCounter { base: AtomicIsize, cells: Vec,