Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnsw_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ struct Args {
clusters: usize,
noise: f32,
query_repeats: usize,
/// When > 0, rebuild the graph in a loop for this many seconds (sustained
/// write), instead of a single build.
insert_seconds: f64,
/// When > 0, run the query workload in a loop for this many seconds
/// (sustained read), instead of `query_repeats` passes.
query_seconds: f64,
}

impl Default for Args {
Expand All @@ -50,6 +56,8 @@ impl Default for Args {
clusters: 4096,
noise: 0.05,
query_repeats: 1,
insert_seconds: 0.0,
query_seconds: 0.0,
}
}
}
Expand Down Expand Up @@ -104,17 +112,40 @@ fn main() -> Result<(), Box<dyn Error>> {
.num_edges(args.m)
.ef_construction(args.ef_construction)
.seed(args.seed);
let graph = HnswGraph::try_new(args.rows, params)?;

// Sustained write: rebuild a fresh graph in a loop for `insert_seconds`
// (or a single build when 0) so throughput reflects steady-state, not a
// burst. Only the last graph is retained for the query phase.
let insert_start = Instant::now();
graph.insert_batch(ids, &snapshot)?;
let mut insert_passes: u64 = 0;
let mut insert_core = std::time::Duration::ZERO;
let graph = loop {
let g = HnswGraph::try_new(args.rows, params.clone())?;
let core = Instant::now();
g.insert_batch(ids.clone(), &snapshot)?;
insert_core += core.elapsed();
insert_passes += 1;
if args.insert_seconds <= 0.0 || insert_start.elapsed().as_secs_f64() >= args.insert_seconds
{
break g;
}
};
let insert_s = insert_start.elapsed().as_secs_f64();
let insert_qps = args.rows as f64 / insert_s;
let insert_qps = (args.rows as u64 * insert_passes) as f64 / insert_s;
// insert_core excludes graph allocation + teardown, isolating the insertion
// algorithm from the per-build alloc/free cost.
let insert_core_s = insert_core.as_secs_f64();
let insert_core_qps = (args.rows as u64 * insert_passes) as f64 / insert_core_s;

// Sustained read: run the query workload in a loop for `query_seconds`
// (or `query_repeats` passes when 0).
let search_query_ids = query_ids(&args, args.queries);
let query_start = Instant::now();
let mut hits = 0usize;
for _ in 0..args.query_repeats {
// Assigned on every loop iteration before any break; the loop always runs at
// least once, so no dead initial store.
let mut hits: usize;
let mut query_passes: u64 = 0;
loop {
hits = search_query_ids
.par_iter()
.map(|row| {
Expand All @@ -125,9 +156,17 @@ fn main() -> Result<(), Box<dyn Error>> {
usize::from(results.iter().any(|result| result.id as usize == *row))
})
.sum();
query_passes += 1;
if args.query_seconds > 0.0 {
if query_start.elapsed().as_secs_f64() >= args.query_seconds {
break;
}
} else if query_passes >= args.query_repeats as u64 {
break;
}
}
let query_s = query_start.elapsed().as_secs_f64();
let query_qps = (args.queries * args.query_repeats) as f64 / query_s;
let query_qps = (args.queries as u64 * query_passes) as f64 / query_s;
let self_recall = hits as f64 / args.queries as f64;

let truth_query_ids = query_ids(&args, args.truth_queries);
Expand All @@ -150,14 +189,18 @@ fn main() -> Result<(), Box<dyn Error>> {
let recall_at_k = recall_hits as f64 / (args.truth_queries * args.k) as f64;

println!(
"result impl=lance_hnsw rows={} dim={} generate_s={:.6} arrow_s={:.6} insert_s={:.6} insert_qps={:.3} query_s={:.6} query_qps={:.3} truth_s={:.6} recall_at_{}={:.6} self_recall_at_{}={:.6}",
"result impl=lance_hnsw rows={} dim={} generate_s={:.6} arrow_s={:.6} insert_s={:.6} insert_passes={} insert_qps={:.3} insert_core_s={:.6} insert_core_qps={:.3} query_s={:.6} query_passes={} query_qps={:.3} truth_s={:.6} recall_at_{}={:.6} self_recall_at_{}={:.6}",
args.rows,
args.dim,
generate_s,
arrow_s,
insert_s,
insert_passes,
insert_qps,
insert_core_s,
insert_core_qps,
query_s,
query_passes,
query_qps,
truth_s,
args.k,
Expand Down Expand Up @@ -210,6 +253,8 @@ fn parse_args() -> Result<Args, Box<dyn Error>> {
"--clusters" => args.clusters = value.parse()?,
"--noise" => args.noise = value.parse()?,
"--query-repeats" => args.query_repeats = value.parse()?,
"--insert-seconds" => args.insert_seconds = value.parse()?,
"--query-seconds" => args.query_seconds = value.parse()?,
_ => return Err(format!("unknown argument: {flag}").into()),
}
}
Expand Down
66 changes: 51 additions & 15 deletions rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnswlib_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstdlib>
#include <exception>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
Expand All @@ -31,6 +32,8 @@ struct Args {
size_t clusters = 4096;
float noise = 0.05f;
size_t query_repeats = 1;
double insert_seconds = 0.0;
double query_seconds = 0.0;
};

Args parse_args(int argc, char **argv);
Expand Down Expand Up @@ -112,30 +115,47 @@ int main(int argc, char **argv) {
double generate_s = elapsed_seconds(generate_start);

hnswlib::L2Space space(static_cast<int>(args.dim));
hnswlib::HierarchicalNSW<float> index(
&space,
args.rows,
args.m,
args.ef_construction,
static_cast<size_t>(args.seed));
index.setEf(args.ef_search);

// Sustained write: rebuild a fresh index in a loop for `insert_seconds`
// (or a single build when 0) so throughput reflects steady-state, not a
// burst. The last index is retained for the query phase.
auto insert_start = clock_now();
parallel_for(0, args.rows, args.threads, [&](size_t row, size_t) {
index.addPoint(static_cast<void *>(data.data() + row * args.dim), row);
});
size_t insert_passes = 0;
double insert_core = 0.0;
std::unique_ptr<hnswlib::HierarchicalNSW<float>> index;
do {
index = std::make_unique<hnswlib::HierarchicalNSW<float>>(
&space,
args.rows,
args.m,
args.ef_construction,
static_cast<size_t>(args.seed));
index->setEf(args.ef_search);
auto core = clock_now();
parallel_for(0, args.rows, args.threads, [&](size_t row, size_t) {
index->addPoint(static_cast<void *>(data.data() + row * args.dim), row);
});
insert_core += elapsed_seconds(core);
++insert_passes;
} while (args.insert_seconds > 0.0 && elapsed_seconds(insert_start) < args.insert_seconds);
double insert_s = elapsed_seconds(insert_start);
double insert_qps = static_cast<double>(args.rows) / insert_s;
double insert_qps = static_cast<double>(args.rows * insert_passes) / insert_s;
// insert_core excludes index allocation + destruction.
double insert_core_s = insert_core;
double insert_core_qps = static_cast<double>(args.rows * insert_passes) / insert_core_s;

// Sustained read: run the query workload in a loop for `query_seconds`
// (or `query_repeats` passes when 0).
std::vector<size_t> queries = query_ids(args, args.queries);
auto query_start = clock_now();
std::atomic<size_t> hits{0};
for (size_t rep = 0; rep < args.query_repeats; ++rep) {
size_t query_passes = 0;
for (;;) {
hits.store(0, std::memory_order_relaxed);
parallel_for(0, queries.size(), args.threads, [&](size_t idx, size_t) {
size_t row = queries[idx];
std::priority_queue<std::pair<float, hnswlib::labeltype>> result =
index.searchKnn(data.data() + row * args.dim, args.k);
index->searchKnn(data.data() + row * args.dim, args.k);
bool found = false;
while (!result.empty()) {
if (static_cast<size_t>(result.top().second) == row) {
Expand All @@ -148,9 +168,17 @@ int main(int argc, char **argv) {
hits.fetch_add(1, std::memory_order_relaxed);
}
});
++query_passes;
if (args.query_seconds > 0.0) {
if (elapsed_seconds(query_start) >= args.query_seconds) {
break;
}
} else if (query_passes >= args.query_repeats) {
break;
}
}
double query_s = elapsed_seconds(query_start);
double query_qps = static_cast<double>(args.queries * args.query_repeats) / query_s;
double query_qps = static_cast<double>(args.queries * query_passes) / query_s;
double self_recall = static_cast<double>(hits.load()) / static_cast<double>(args.queries);

std::vector<size_t> truth_queries = query_ids(args, args.truth_queries);
Expand All @@ -161,7 +189,7 @@ int main(int argc, char **argv) {
const float *query = data.data() + row * args.dim;
std::vector<size_t> truth = exact_top_k(query, data, args);
std::priority_queue<std::pair<float, hnswlib::labeltype>> result =
index.searchKnn(query, args.k);
index->searchKnn(query, args.k);
while (!result.empty()) {
size_t id = static_cast<size_t>(result.top().second);
for (size_t truth_id : truth) {
Expand All @@ -181,8 +209,12 @@ int main(int argc, char **argv) {
<< " dim=" << args.dim
<< " generate_s=" << generate_s
<< " insert_s=" << insert_s
<< " insert_passes=" << insert_passes
<< " insert_qps=" << insert_qps
<< " insert_core_s=" << insert_core_s
<< " insert_core_qps=" << insert_core_qps
<< " query_s=" << query_s
<< " query_passes=" << query_passes
<< " query_qps=" << query_qps
<< " truth_s=" << truth_s
<< " recall_at_" << args.k << "=" << recall_at_k
Expand Down Expand Up @@ -247,6 +279,10 @@ Args parse_args(int argc, char **argv) {
args.noise = std::stof(value);
} else if (flag == "--query-repeats") {
args.query_repeats = parse_size(value);
} else if (flag == "--insert-seconds") {
args.insert_seconds = std::stod(value);
} else if (flag == "--query-seconds") {
args.query_seconds = std::stod(value);
} else {
throw std::invalid_argument("unknown argument: " + flag);
}
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/benches/mem_wal/vector/hnsw/run_parity_suite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ SIZES="${SIZES:-100000 500000 1000000}"
DIM="${DIM:-1024}"
QUERIES="${QUERIES:-5000}"
QUERY_REPEATS="${QUERY_REPEATS:-20}"
# Sustained measurement: when > 0, write rebuilds the graph and read loops the
# query workload for this many seconds, reporting steady-state (not burst).
INSERT_SECONDS="${INSERT_SECONDS:-0}"
QUERY_SECONDS="${QUERY_SECONDS:-0}"
TRUTH_QUERIES="${TRUTH_QUERIES:-200}"
K="${K:-10}"
M="${M:-12}"
Expand Down Expand Up @@ -62,6 +66,7 @@ run_one() {
--k "$K" --m "$M" --ef-construction "$EF_CONSTRUCTION" --ef-search "$EF_SEARCH"
--threads "$THREADS" --seed "$SEED" --clusters "$CLUSTERS" --noise "$NOISE"
--query-repeats "$QUERY_REPEATS"
--insert-seconds "$INSERT_SECONDS" --query-seconds "$QUERY_SECONDS"
)
local out="$OUT_DIR/${tag}.out"
local timef="$OUT_DIR/${tag}.time"
Expand Down
Loading