diff --git a/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnsw_bench.rs b/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnsw_bench.rs index c647708d5d7..6c2fa697d67 100644 --- a/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnsw_bench.rs +++ b/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnsw_bench.rs @@ -32,6 +32,12 @@ struct Args { clusters: usize, noise: f32, query_repeats: usize, + /// When > 0, rebuild the graph in a loop for this many seconds (sustained + /// write), instead of a single build. + insert_seconds: f64, + /// When > 0, run the query workload in a loop for this many seconds + /// (sustained read), instead of `query_repeats` passes. + query_seconds: f64, } impl Default for Args { @@ -50,6 +56,8 @@ impl Default for Args { clusters: 4096, noise: 0.05, query_repeats: 1, + insert_seconds: 0.0, + query_seconds: 0.0, } } } @@ -104,17 +112,40 @@ fn main() -> Result<(), Box> { .num_edges(args.m) .ef_construction(args.ef_construction) .seed(args.seed); - let graph = HnswGraph::try_new(args.rows, params)?; + // Sustained write: rebuild a fresh graph in a loop for `insert_seconds` + // (or a single build when 0) so throughput reflects steady-state, not a + // burst. Only the last graph is retained for the query phase. let insert_start = Instant::now(); - graph.insert_batch(ids, &snapshot)?; + let mut insert_passes: u64 = 0; + let mut insert_core = std::time::Duration::ZERO; + let graph = loop { + let g = HnswGraph::try_new(args.rows, params.clone())?; + let core = Instant::now(); + g.insert_batch(ids.clone(), &snapshot)?; + insert_core += core.elapsed(); + insert_passes += 1; + if args.insert_seconds <= 0.0 || insert_start.elapsed().as_secs_f64() >= args.insert_seconds + { + break g; + } + }; let insert_s = insert_start.elapsed().as_secs_f64(); - let insert_qps = args.rows as f64 / insert_s; + let insert_qps = (args.rows as u64 * insert_passes) as f64 / insert_s; + // insert_core excludes graph allocation + teardown, isolating the insertion + // algorithm from the per-build alloc/free cost. + let insert_core_s = insert_core.as_secs_f64(); + let insert_core_qps = (args.rows as u64 * insert_passes) as f64 / insert_core_s; + // Sustained read: run the query workload in a loop for `query_seconds` + // (or `query_repeats` passes when 0). let search_query_ids = query_ids(&args, args.queries); let query_start = Instant::now(); - let mut hits = 0usize; - for _ in 0..args.query_repeats { + // Assigned on every loop iteration before any break; the loop always runs at + // least once, so no dead initial store. + let mut hits: usize; + let mut query_passes: u64 = 0; + loop { hits = search_query_ids .par_iter() .map(|row| { @@ -125,9 +156,17 @@ fn main() -> Result<(), Box> { usize::from(results.iter().any(|result| result.id as usize == *row)) }) .sum(); + query_passes += 1; + if args.query_seconds > 0.0 { + if query_start.elapsed().as_secs_f64() >= args.query_seconds { + break; + } + } else if query_passes >= args.query_repeats as u64 { + break; + } } let query_s = query_start.elapsed().as_secs_f64(); - let query_qps = (args.queries * args.query_repeats) as f64 / query_s; + let query_qps = (args.queries as u64 * query_passes) as f64 / query_s; let self_recall = hits as f64 / args.queries as f64; let truth_query_ids = query_ids(&args, args.truth_queries); @@ -150,14 +189,18 @@ fn main() -> Result<(), Box> { let recall_at_k = recall_hits as f64 / (args.truth_queries * args.k) as f64; println!( - "result impl=lance_hnsw rows={} dim={} generate_s={:.6} arrow_s={:.6} insert_s={:.6} insert_qps={:.3} query_s={:.6} query_qps={:.3} truth_s={:.6} recall_at_{}={:.6} self_recall_at_{}={:.6}", + "result impl=lance_hnsw rows={} dim={} generate_s={:.6} arrow_s={:.6} insert_s={:.6} insert_passes={} insert_qps={:.3} insert_core_s={:.6} insert_core_qps={:.3} query_s={:.6} query_passes={} query_qps={:.3} truth_s={:.6} recall_at_{}={:.6} self_recall_at_{}={:.6}", args.rows, args.dim, generate_s, arrow_s, insert_s, + insert_passes, insert_qps, + insert_core_s, + insert_core_qps, query_s, + query_passes, query_qps, truth_s, args.k, @@ -210,6 +253,8 @@ fn parse_args() -> Result> { "--clusters" => args.clusters = value.parse()?, "--noise" => args.noise = value.parse()?, "--query-repeats" => args.query_repeats = value.parse()?, + "--insert-seconds" => args.insert_seconds = value.parse()?, + "--query-seconds" => args.query_seconds = value.parse()?, _ => return Err(format!("unknown argument: {flag}").into()), } } diff --git a/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnswlib_bench.cpp b/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnswlib_bench.cpp index b016533bcb2..3d9dc7dcfa0 100644 --- a/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnswlib_bench.cpp +++ b/rust/lance/benches/mem_wal/vector/hnsw/mem_wal_hnswlib_bench.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -31,6 +32,8 @@ struct Args { size_t clusters = 4096; float noise = 0.05f; size_t query_repeats = 1; + double insert_seconds = 0.0; + double query_seconds = 0.0; }; Args parse_args(int argc, char **argv); @@ -112,30 +115,47 @@ int main(int argc, char **argv) { double generate_s = elapsed_seconds(generate_start); hnswlib::L2Space space(static_cast(args.dim)); - hnswlib::HierarchicalNSW index( - &space, - args.rows, - args.m, - args.ef_construction, - static_cast(args.seed)); - index.setEf(args.ef_search); + // Sustained write: rebuild a fresh index in a loop for `insert_seconds` + // (or a single build when 0) so throughput reflects steady-state, not a + // burst. The last index is retained for the query phase. auto insert_start = clock_now(); - parallel_for(0, args.rows, args.threads, [&](size_t row, size_t) { - index.addPoint(static_cast(data.data() + row * args.dim), row); - }); + size_t insert_passes = 0; + double insert_core = 0.0; + std::unique_ptr> index; + do { + index = std::make_unique>( + &space, + args.rows, + args.m, + args.ef_construction, + static_cast(args.seed)); + index->setEf(args.ef_search); + auto core = clock_now(); + parallel_for(0, args.rows, args.threads, [&](size_t row, size_t) { + index->addPoint(static_cast(data.data() + row * args.dim), row); + }); + insert_core += elapsed_seconds(core); + ++insert_passes; + } while (args.insert_seconds > 0.0 && elapsed_seconds(insert_start) < args.insert_seconds); double insert_s = elapsed_seconds(insert_start); - double insert_qps = static_cast(args.rows) / insert_s; + double insert_qps = static_cast(args.rows * insert_passes) / insert_s; + // insert_core excludes index allocation + destruction. + double insert_core_s = insert_core; + double insert_core_qps = static_cast(args.rows * insert_passes) / insert_core_s; + // Sustained read: run the query workload in a loop for `query_seconds` + // (or `query_repeats` passes when 0). std::vector queries = query_ids(args, args.queries); auto query_start = clock_now(); std::atomic hits{0}; - for (size_t rep = 0; rep < args.query_repeats; ++rep) { + size_t query_passes = 0; + for (;;) { hits.store(0, std::memory_order_relaxed); parallel_for(0, queries.size(), args.threads, [&](size_t idx, size_t) { size_t row = queries[idx]; std::priority_queue> result = - index.searchKnn(data.data() + row * args.dim, args.k); + index->searchKnn(data.data() + row * args.dim, args.k); bool found = false; while (!result.empty()) { if (static_cast(result.top().second) == row) { @@ -148,9 +168,17 @@ int main(int argc, char **argv) { hits.fetch_add(1, std::memory_order_relaxed); } }); + ++query_passes; + if (args.query_seconds > 0.0) { + if (elapsed_seconds(query_start) >= args.query_seconds) { + break; + } + } else if (query_passes >= args.query_repeats) { + break; + } } double query_s = elapsed_seconds(query_start); - double query_qps = static_cast(args.queries * args.query_repeats) / query_s; + double query_qps = static_cast(args.queries * query_passes) / query_s; double self_recall = static_cast(hits.load()) / static_cast(args.queries); std::vector truth_queries = query_ids(args, args.truth_queries); @@ -161,7 +189,7 @@ int main(int argc, char **argv) { const float *query = data.data() + row * args.dim; std::vector truth = exact_top_k(query, data, args); std::priority_queue> result = - index.searchKnn(query, args.k); + index->searchKnn(query, args.k); while (!result.empty()) { size_t id = static_cast(result.top().second); for (size_t truth_id : truth) { @@ -181,8 +209,12 @@ int main(int argc, char **argv) { << " dim=" << args.dim << " generate_s=" << generate_s << " insert_s=" << insert_s + << " insert_passes=" << insert_passes << " insert_qps=" << insert_qps + << " insert_core_s=" << insert_core_s + << " insert_core_qps=" << insert_core_qps << " query_s=" << query_s + << " query_passes=" << query_passes << " query_qps=" << query_qps << " truth_s=" << truth_s << " recall_at_" << args.k << "=" << recall_at_k @@ -247,6 +279,10 @@ Args parse_args(int argc, char **argv) { args.noise = std::stof(value); } else if (flag == "--query-repeats") { args.query_repeats = parse_size(value); + } else if (flag == "--insert-seconds") { + args.insert_seconds = std::stod(value); + } else if (flag == "--query-seconds") { + args.query_seconds = std::stod(value); } else { throw std::invalid_argument("unknown argument: " + flag); } diff --git a/rust/lance/benches/mem_wal/vector/hnsw/run_parity_suite.sh b/rust/lance/benches/mem_wal/vector/hnsw/run_parity_suite.sh index 6e7a24c4c40..46ed113e66a 100755 --- a/rust/lance/benches/mem_wal/vector/hnsw/run_parity_suite.sh +++ b/rust/lance/benches/mem_wal/vector/hnsw/run_parity_suite.sh @@ -21,6 +21,10 @@ SIZES="${SIZES:-100000 500000 1000000}" DIM="${DIM:-1024}" QUERIES="${QUERIES:-5000}" QUERY_REPEATS="${QUERY_REPEATS:-20}" +# Sustained measurement: when > 0, write rebuilds the graph and read loops the +# query workload for this many seconds, reporting steady-state (not burst). +INSERT_SECONDS="${INSERT_SECONDS:-0}" +QUERY_SECONDS="${QUERY_SECONDS:-0}" TRUTH_QUERIES="${TRUTH_QUERIES:-200}" K="${K:-10}" M="${M:-12}" @@ -62,6 +66,7 @@ run_one() { --k "$K" --m "$M" --ef-construction "$EF_CONSTRUCTION" --ef-search "$EF_SEARCH" --threads "$THREADS" --seed "$SEED" --clusters "$CLUSTERS" --noise "$NOISE" --query-repeats "$QUERY_REPEATS" + --insert-seconds "$INSERT_SECONDS" --query-seconds "$QUERY_SECONDS" ) local out="$OUT_DIR/${tag}.out" local timef="$OUT_DIR/${tag}.time"