Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2c08ee0
Shared counters optimization #11
jimvdl Nov 23, 2021
d5aaaef
Missing safety warning
jimvdl Nov 23, 2021
a4ef493
Shared counters
jimvdl Nov 27, 2021
d982e4d
Minor changes
jimvdl Nov 27, 2021
72286fa
Fixed invalid drop of cells
jimvdl Nov 27, 2021
d4e8587
Run cargo fmt
jimvdl Nov 27, 2021
2a4a28f
Fixed incorrect drop of old counter cells after resize
jimvdl Nov 27, 2021
7b2e23f
Moved counter cell logic into its own module
jimvdl Dec 9, 2021
4b3d0ed
Minor doc fixes
jimvdl Dec 9, 2021
d7de300
Ran cargo fmt
jimvdl Dec 9, 2021
5882f58
Impl Send + Sync for LongAdder
jimvdl Dec 9, 2021
577874e
Removed unnecessary Send and Sync for LongAdder
jimvdl Dec 9, 2021
52d1cfe
Refactored LongAdder to ConcurrentCounter
jimvdl Dec 17, 2021
8be45ce
Merge branch 'master' of https://github.com/jonhoo/flurry into shared…
jimvdl Feb 16, 2022
d72fed6
merged master, minor clippy adjustments
jimvdl Mar 8, 2022
166dca0
Applied review changes
jimvdl Mar 8, 2022
a7e3aaf
Applied some review changes
jimvdl Mar 10, 2022
caa9ae3
Merge branch 'master' of https://github.com/jonhoo/flurry into shared…
jimvdl Mar 11, 2022
68eda45
Better cell selection, reverted resize hint change
jimvdl Mar 12, 2022
e398d4f
Fixed an issue where base wasn't retried correctly every iteration
jimvdl Mar 13, 2022
906672e
Merge branch 'master' into shared-counters-optimization
jimvdl Mar 25, 2022
5221009
Merge branch 'master' of https://github.com/jonhoo/flurry into shared…
jimvdl Apr 3, 2022
d8343f9
Initial benchmarking approach
jimvdl Apr 5, 2022
1bd51f5
Merge branch 'shared-counters-optimization' of https://github.com/jim…
jimvdl Apr 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ harness = false
[[bench]]
name = "flurry_hashbrown"
harness = false

[[bench]]
name = "flurry_counter"
harness = false
124 changes: 124 additions & 0 deletions benches/flurry_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use rayon;
use rayon::prelude::*;
use std::sync::atomic::{AtomicIsize, Ordering};

const ITER: isize = 32 * 1024;

#[derive(Debug)]
struct ConcurrentCounter {
base: AtomicIsize,
cells: Vec<AtomicIsize>,
}

impl ConcurrentCounter {
fn new() -> Self {
Self {
base: AtomicIsize::new(0),
cells: (0..num_cpus::get())
.into_iter()
.map(|_| AtomicIsize::new(0))
.collect(),
}
}

fn add(&self, value: isize) {
let mut base = self.base.load(Ordering::SeqCst);
let mut index = base + value;

loop {
match self.base.compare_exchange(
base,
base + value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(b) => base = b,
}

let c = &self.cells[index as usize % self.cells.len()];
let cv = c.load(Ordering::SeqCst);
index += cv;

if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}

fn sum(&self, ordering: Ordering) -> isize {
let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum();

self.base.load(ordering) + sum
}
}

fn atomic_counter(c: &mut Criterion) {
let mut group = c.benchmark_group("atomic_counter");
group.throughput(Throughput::Elements(ITER as u64));
let max = num_cpus::get();

for threads in 1..=max {
group.bench_with_input(
BenchmarkId::from_parameter(threads),
&threads,
|b, &threads| {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.unwrap();
pool.install(|| {
b.iter(|| {
let counter = AtomicIsize::new(0);
(0..ITER).into_par_iter().for_each(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
assert_eq!(ITER, counter.load(Ordering::Relaxed));
})
});
},
);
}

group.finish();
}

fn concurrent_counter(c: &mut Criterion) {
let mut group = c.benchmark_group("concurrent_counter");
group.throughput(Throughput::Elements(ITER as u64));
let max = num_cpus::get();

for threads in 1..=max {
group.bench_with_input(
BenchmarkId::from_parameter(threads),
&threads,
|b, &threads| {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.unwrap();
pool.install(|| {
b.iter(|| {
let counter = ConcurrentCounter::new();
(0..ITER).into_par_iter().for_each(|_| {
counter.add(1);
});
assert_eq!(ITER, counter.sum(Ordering::Relaxed));
})
});
},
);
}

group.finish();
}

criterion_group!(
benches,
atomic_counter,
concurrent_counter
);
criterion_main!(benches);
53 changes: 53 additions & 0 deletions src/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::sync::atomic::{AtomicIsize, Ordering};

// TODO: finish Java CounterCell port, this is only a bare minimum implementation.
#[derive(Debug)]
pub(crate) struct ConcurrentCounter {
base: AtomicIsize,
cells: Vec<AtomicIsize>,
}

impl ConcurrentCounter {
pub(crate) fn new() -> Self {
Self {
base: AtomicIsize::new(0),
cells: (0..crate::map::num_cpus())
.into_iter()
.map(|_| AtomicIsize::new(0))
.collect(),
}
}

pub(crate) fn add(&self, value: isize) {
let mut base = self.base.load(Ordering::SeqCst);
let mut index = base + value;

loop {
match self.base.compare_exchange(
base,
base + value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(b) => base = b,
}

let c = &self.cells[index as usize % self.cells.len()];
let cv = c.load(Ordering::SeqCst);
index += cv;

if c.compare_exchange(cv, cv + value, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}

pub(crate) fn sum(&self, ordering: Ordering) -> isize {
let sum: isize = self.cells.iter().map(|c| c.load(ordering)).sum();

self.base.load(ordering) + sum
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@
#![warn(rust_2018_idioms)]
#![allow(clippy::cognitive_complexity)]

mod counter;
mod map;
mod map_ref;
mod node;
Expand Down
33 changes: 17 additions & 16 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use seize::Linked;

use crate::counter::ConcurrentCounter;
use crate::iter::*;
use crate::node::*;
use crate::raw::*;
use crate::reclaim::{Atomic, Collector, Guard, RetireShared, Shared};
use seize::Linked;
use std::borrow::Borrow;
use std::error::Error;
use std::fmt::{self, Debug, Display, Formatter};
Expand Down Expand Up @@ -93,7 +93,7 @@ pub struct HashMap<K, V, S = crate::DefaultHashBuilder> {
/// The next table index (plus one) to split while resizing.
transfer_index: AtomicIsize,

count: AtomicIsize,
counter: ConcurrentCounter,

/// Table initialization and resizing control. When negative, the
/// table is being initialized or resized: -1 for initialization,
Expand Down Expand Up @@ -280,8 +280,8 @@ impl<K, V, S> HashMap<K, V, S> {
table: Atomic::null(),
next_table: Atomic::null(),
transfer_index: AtomicIsize::new(0),
count: AtomicIsize::new(0),
size_ctl: AtomicIsize::new(0),
counter: ConcurrentCounter::new(),
build_hasher: hash_builder,
collector: Collector::new(),
}
Expand Down Expand Up @@ -349,6 +349,12 @@ impl<K, V, S> HashMap<K, V, S> {

/// Returns the number of entries in the map.
///
/// Note that the returned value is _NOT_ an
/// atomic snapshot; invocation in the absence of concurrent
/// updates returns an accurate result, but concurrent updates that
/// occur while the sum is being calculated might not be
/// incorporated.
///
/// # Examples
///
/// ```
Expand All @@ -361,7 +367,7 @@ impl<K, V, S> HashMap<K, V, S> {
/// assert!(map.pin().len() == 2);
/// ```
pub fn len(&self) -> usize {
let n = self.count.load(Ordering::Relaxed);
let n = self.counter.sum(Ordering::Relaxed);
if n < 0 {
0
} else {
Expand Down Expand Up @@ -1141,21 +1147,16 @@ where
}

fn add_count(&self, n: isize, resize_hint: Option<usize>, guard: &Guard<'_>) {
// TODO: implement the Java CounterCell business here
Comment thread
jimvdl marked this conversation as resolved.

use std::cmp;
let mut count = match n.cmp(&0) {
cmp::Ordering::Greater => self.count.fetch_add(n, Ordering::SeqCst) + n,
cmp::Ordering::Less => self.count.fetch_sub(n.abs(), Ordering::SeqCst) - n,
cmp::Ordering::Equal => self.count.load(Ordering::SeqCst),
};
self.counter.add(n);

// if resize_hint is None, it means the caller does not want us to consider a resize.
// if it is Some(n), the caller saw n entries in a bin
if resize_hint.is_none() {
return;
}

let mut count = self.counter.sum(Ordering::Relaxed);

// TODO: use the resize hint
let _saw_bin_length = resize_hint.unwrap();

Expand Down Expand Up @@ -1217,7 +1218,7 @@ where
}

// another resize may be needed!
count = self.count.load(Ordering::SeqCst);
count = self.counter.sum(Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -3088,14 +3089,14 @@ where
#[cfg(not(miri))]
#[inline]
/// Returns the number of physical CPUs in the machine (_O(1)_).
fn num_cpus() -> usize {
pub(crate) fn num_cpus() -> usize {
NCPU_INITIALIZER.call_once(|| NCPU.store(num_cpus::get_physical(), Ordering::Relaxed));
NCPU.load(Ordering::Relaxed)
}

#[cfg(miri)]
#[inline]
const fn num_cpus() -> usize {
pub(crate) const fn num_cpus() -> usize {
1
}

Expand Down
2 changes: 1 addition & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ impl<K, V> TreeBin<K, V> {
guard: &'g Guard<'_>,
) {
guard.retire(bin.as_ptr(), |mut link| {
let bin = unsafe {
let bin = {
// SAFETY: `bin` is a `BinEntry<K, V>`
let ptr = link.cast::<BinEntry<K, V>>();
// SAFETY: `retire` guarantees that we
Expand Down