Skip to content

Commit 7fa9309

Browse files
committed
feat: use cardinality estimator for distinct count stats
Replace the exact `HashMap`/`HashSet` previously used to compute distinct-value counts during compression stats generation with Cloudflare's `cardinality-estimator` crate. The estimator gives us a bounded-memory approximation (exact up to ~128 distinct values, then HyperLogLog++) so high-cardinality arrays no longer require an O(n) auxiliary hash table to answer the single question "how many unique values does this have?". - Integer stats swap the hash map for a `CardinalityEstimator` and track the most frequent value via a Boyer-Moore majority candidate plus a second-pass exact count. Sparse/dict schemes only care about the heavy hitter (>= 90% threshold) or a rough distinct ratio, so this is behaviourally equivalent for the decisions they make. - Float and string stats likewise drop their hash sets in favor of the estimator. - The integer and float dictionary encoders now rebuild the exact set of distinct values from the source array at compress time, since they need the values themselves and the stats layer no longer retains them. - `SequenceScheme`'s fast-path check for "all values are distinct" now tolerates the estimator's small approximation error; the deferred callback still validates sequences exactly. Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 5e5572b commit 7fa9309

42 files changed

Lines changed: 840 additions & 703 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ bit-vec = "0.9.0"
112112
bitvec = "1.0.1"
113113
bytes = "1.11.1"
114114
bzip2 = "0.6.0"
115+
cardinality-estimator = "1.0.3"
115116
cargo_metadata = "0.23.1"
116117
cbindgen = "0.29.0"
117118
cc = "1.2"

encodings/fastlanes/src/rle/array/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ mod tests {
473473
.indices()
474474
.clone()
475475
.execute::<PrimitiveArray>(&mut ctx)?
476-
.narrow()?;
476+
.narrow(&mut ctx)?;
477477
let re_encoded = RLEData::encode(indices_prim.as_view(), &mut ctx)?;
478478

479479
// Reconstruct the outer RLE with re-encoded indices.

encodings/runend/src/compress.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ pub fn runend_encode(
8484
}
8585
};
8686

87-
let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
87+
let ends = ends
88+
.narrow(ctx)
89+
.vortex_expect("Ends must succeed downcasting");
8890

8991
ends.statistics()
9092
.set(Stat::IsStrictSorted, Precision::Exact(true.into()));

vortex-array/benches/dict_compare.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#![expect(clippy::unwrap_used)]
55

66
use std::str::from_utf8;
7+
use std::sync::LazyLock;
78

89
use vortex_array::Canonical;
910
use vortex_array::IntoArray;
@@ -21,6 +22,7 @@ use vortex_array::expr::eq;
2122
use vortex_array::expr::lit;
2223
use vortex_array::expr::root;
2324
use vortex_array::scalar_fn::fns::operators::Operator;
25+
use vortex_array::session::ArraySession;
2426
use vortex_session::VortexSession;
2527

2628
fn main() {
@@ -45,15 +47,21 @@ const LENGTH_AND_UNIQUE_VALUES: &[(usize, usize)] = &[
4547
(100_000, 2048),
4648
];
4749

50+
static SESSION: LazyLock<VortexSession> =
51+
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
52+
4853
#[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)]
4954
fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) {
5055
let primitive_arr = gen_primitive_for_dict::<i32>(len, uniqueness);
51-
let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap();
56+
let dict = dict_encode(
57+
&primitive_arr.clone().into_array(),
58+
&mut SESSION.create_execution_ctx(),
59+
)
60+
.unwrap();
5261
let value = primitive_arr.as_slice::<i32>()[0];
53-
let session = VortexSession::empty();
5462

5563
bencher
56-
.with_inputs(|| (&dict, session.create_execution_ctx()))
64+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
5765
.bench_refs(|(dict, ctx)| {
5866
dict.clone()
5967
.into_array()
@@ -67,13 +75,16 @@ fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, u
6775
#[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)]
6876
fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) {
6977
let varbin_arr = VarBinArray::from(gen_varbin_words(len, uniqueness));
70-
let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap();
78+
let dict = dict_encode(
79+
&varbin_arr.clone().into_array(),
80+
&mut SESSION.create_execution_ctx(),
81+
)
82+
.unwrap();
7183
let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec());
7284
let value = from_utf8(bytes.as_slice()).unwrap();
73-
let session = VortexSession::empty();
7485

7586
bencher
76-
.with_inputs(|| (&dict, session.create_execution_ctx()))
87+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
7788
.bench_refs(|(dict, ctx)| {
7889
dict.clone()
7990
.into_array()
@@ -87,13 +98,16 @@ fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usiz
8798
#[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)]
8899
fn bench_compare_varbinview(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) {
89100
let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, uniqueness));
90-
let dict = dict_encode(&varbinview_arr.clone().into_array()).unwrap();
101+
let dict = dict_encode(
102+
&varbinview_arr.clone().into_array(),
103+
&mut SESSION.create_execution_ctx(),
104+
)
105+
.unwrap();
91106
let bytes = varbinview_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec());
92107
let value = from_utf8(bytes.as_slice()).unwrap();
93-
let session = VortexSession::empty();
94108

95109
bencher
96-
.with_inputs(|| (&dict, session.create_execution_ctx()))
110+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
97111
.bench_refs(|(dict, ctx)| {
98112
dict.clone()
99113
.into_array()
@@ -122,13 +136,16 @@ fn bench_compare_sliced_dict_primitive(
122136
(codes_len, values_len): (usize, usize),
123137
) {
124138
let primitive_arr = gen_primitive_for_dict::<i32>(codes_len.max(values_len), values_len);
125-
let dict = dict_encode(&primitive_arr.clone().into_array()).unwrap();
139+
let dict = dict_encode(
140+
&primitive_arr.clone().into_array(),
141+
&mut SESSION.create_execution_ctx(),
142+
)
143+
.unwrap();
126144
let dict = dict.into_array().slice(0..codes_len).unwrap();
127145
let value = primitive_arr.as_slice::<i32>()[0];
128-
let session = VortexSession::empty();
129146

130147
bencher
131-
.with_inputs(|| (&dict, session.create_execution_ctx()))
148+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
132149
.bench_refs(|(dict, ctx)| {
133150
dict.clone()
134151
.apply(&eq(root(), lit(value)))
@@ -144,14 +161,17 @@ fn bench_compare_sliced_dict_varbinview(
144161
(codes_len, values_len): (usize, usize),
145162
) {
146163
let varbin_arr = VarBinArray::from(gen_varbin_words(codes_len.max(values_len), values_len));
147-
let dict = dict_encode(&varbin_arr.clone().into_array()).unwrap();
164+
let dict = dict_encode(
165+
&varbin_arr.clone().into_array(),
166+
&mut SESSION.create_execution_ctx(),
167+
)
168+
.unwrap();
148169
let dict = dict.into_array().slice(0..codes_len).unwrap();
149170
let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec());
150171
let value = from_utf8(bytes.as_slice()).unwrap();
151-
let session = VortexSession::empty();
152172

153173
bencher
154-
.with_inputs(|| (&dict, session.create_execution_ctx()))
174+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
155175
.bench_refs(|(dict, ctx)| {
156176
dict.clone()
157177
.apply(&eq(root(), lit(value)))

vortex-array/benches/dict_compress.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@
33

44
#![expect(clippy::unwrap_used)]
55

6+
use std::sync::LazyLock;
7+
68
use divan::Bencher;
79
use rand::distr::Distribution;
810
use rand::distr::StandardUniform;
911
use vortex_array::Canonical;
1012
use vortex_array::IntoArray;
11-
use vortex_array::LEGACY_SESSION;
1213
use vortex_array::VortexSessionExecute;
1314
use vortex_array::arrays::VarBinArray;
1415
use vortex_array::arrays::VarBinViewArray;
1516
use vortex_array::arrays::dict_test::gen_primitive_for_dict;
1617
use vortex_array::arrays::dict_test::gen_varbin_words;
1718
use vortex_array::builders::dict::dict_encode;
1819
use vortex_array::dtype::NativePType;
20+
use vortex_array::session::ArraySession;
21+
use vortex_session::VortexSession;
1922

2023
fn main() {
2124
divan::main();
@@ -35,35 +38,39 @@ const BENCH_ARGS: &[(usize, usize)] = &[
3538
(10_000, 512),
3639
];
3740

41+
static SESSION: LazyLock<VortexSession> =
42+
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
43+
3844
#[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)]
3945
fn encode_primitives<T>(bencher: Bencher, (len, unique_values): (usize, usize))
4046
where
4147
T: NativePType,
4248
StandardUniform: Distribution<T>,
4349
{
44-
let primitive_arr = gen_primitive_for_dict::<T>(len, unique_values);
50+
let primitive_arr = gen_primitive_for_dict::<T>(len, unique_values).into_array();
4551

4652
bencher
47-
.with_inputs(|| &primitive_arr)
48-
.bench_refs(|arr| dict_encode(&arr.clone().into_array()));
53+
.with_inputs(|| (&primitive_arr, SESSION.create_execution_ctx()))
54+
.bench_refs(|(arr, ctx)| dict_encode(arr, ctx));
4955
}
5056

5157
#[divan::bench(args = BENCH_ARGS)]
5258
fn encode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) {
53-
let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values));
59+
let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array();
5460

5561
bencher
56-
.with_inputs(|| &varbin_arr)
57-
.bench_refs(|arr| dict_encode(&arr.clone().into_array()));
62+
.with_inputs(|| (&varbin_arr, SESSION.create_execution_ctx()))
63+
.bench_refs(|(arr, ctx)| dict_encode(arr, ctx));
5864
}
5965

6066
#[divan::bench(args = BENCH_ARGS)]
6167
fn encode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) {
62-
let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values));
68+
let varbinview_arr =
69+
VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array();
6370

6471
bencher
65-
.with_inputs(|| &varbinview_arr)
66-
.bench_refs(|arr| dict_encode(&arr.clone().into_array()));
72+
.with_inputs(|| (&varbinview_arr, SESSION.create_execution_ctx()))
73+
.bench_refs(|(arr, ctx)| dict_encode(arr, ctx));
6774
}
6875

6976
#[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)]
@@ -72,34 +79,37 @@ where
7279
T: NativePType,
7380
StandardUniform: Distribution<T>,
7481
{
75-
let primitive_arr = gen_primitive_for_dict::<T>(len, unique_values);
76-
let dict = dict_encode(&primitive_arr.into_array())
82+
let primitive_arr = gen_primitive_for_dict::<T>(len, unique_values).into_array();
83+
let dict = dict_encode(&primitive_arr, &mut SESSION.create_execution_ctx())
7784
.unwrap()
7885
.into_array();
7986

8087
bencher
81-
.with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx()))
88+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
8289
.bench_refs(|(dict, ctx)| (**dict).clone().execute::<Canonical>(ctx));
8390
}
8491

8592
#[divan::bench(args = BENCH_ARGS)]
8693
fn decode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) {
87-
let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values));
88-
let dict = dict_encode(&varbin_arr.into_array()).unwrap().into_array();
94+
let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)).into_array();
95+
let dict = dict_encode(&varbin_arr, &mut SESSION.create_execution_ctx())
96+
.unwrap()
97+
.into_array();
8998

9099
bencher
91-
.with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx()))
100+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
92101
.bench_refs(|(dict, ctx)| (**dict).clone().execute::<Canonical>(ctx));
93102
}
94103

95104
#[divan::bench(args = BENCH_ARGS)]
96105
fn decode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) {
97-
let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values));
98-
let dict = dict_encode(&varbinview_arr.into_array())
106+
let varbinview_arr =
107+
VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)).into_array();
108+
let dict = dict_encode(&varbinview_arr, &mut SESSION.create_execution_ctx())
99109
.unwrap()
100110
.into_array();
101111

102112
bencher
103-
.with_inputs(|| (&dict, LEGACY_SESSION.create_execution_ctx()))
113+
.with_inputs(|| (&dict, SESSION.create_execution_ctx()))
104114
.bench_refs(|(dict, ctx)| (**dict).clone().execute::<Canonical>(ctx));
105115
}

vortex-array/public-api.lock

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4038,7 +4038,7 @@ pub trait vortex_array::arrays::primitive::PrimitiveArrayExt: vortex_array::Type
40384038

40394039
pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle
40404040

4041-
pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self) -> vortex_error::VortexResult<vortex_array::arrays::PrimitiveArray>
4041+
pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::PrimitiveArray>
40424042

40434043
pub fn vortex_array::arrays::primitive::PrimitiveArrayExt::nullability(&self) -> vortex_array::dtype::Nullability
40444044

@@ -4054,7 +4054,7 @@ impl<T: vortex_array::TypedArrayRef<vortex_array::arrays::Primitive>> vortex_arr
40544054

40554055
pub fn T::buffer_handle(&self) -> &vortex_array::buffer::BufferHandle
40564056

4057-
pub fn T::narrow(&self) -> vortex_error::VortexResult<vortex_array::arrays::PrimitiveArray>
4057+
pub fn T::narrow(&self, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::PrimitiveArray>
40584058

40594059
pub fn T::nullability(&self) -> vortex_array::dtype::Nullability
40604060

@@ -7440,13 +7440,13 @@ pub trait vortex_array::builders::dict::DictEncoder: core::marker::Send
74407440

74417441
pub fn vortex_array::builders::dict::DictEncoder::codes_ptype(&self) -> vortex_array::dtype::PType
74427442

7443-
pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef) -> vortex_array::ArrayRef
7443+
pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::PrimitiveArray>
74447444

74457445
pub fn vortex_array::builders::dict::DictEncoder::reset(&mut self) -> vortex_array::ArrayRef
74467446

7447-
pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef) -> vortex_error::VortexResult<vortex_array::arrays::dict::DictArray>
7447+
pub fn vortex_array::builders::dict::dict_encode(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::dict::DictArray>
74487448

7449-
pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> vortex_error::VortexResult<vortex_array::arrays::dict::DictArray>
7449+
pub fn vortex_array::builders::dict::dict_encode_with_constraints(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::dict::DictArray>
74507450

74517451
pub fn vortex_array::builders::dict::dict_encoder(&vortex_array::ArrayRef, &vortex_array::builders::dict::DictConstraints) -> alloc::boxed::Box<dyn vortex_array::builders::dict::DictEncoder>
74527452

0 commit comments

Comments
 (0)