Users/cheqi/pre merge tikv parallelbuild#447
Conversation
…pre-merge-tikv-parallelbuild
…-merge-tikv-parallelbuild
…thub.com/microsoft/SPTAG into users/cheqi/pre-merge-tikv-parallelbuild
…m/microsoft/SPTAG into users/cheqi/pre-merge-tikv-parallelbuild
There was a problem hiding this comment.
Pull request overview
This PR updates SPFresh/TiKV benchmarking and SPANN dynamic posting handling to support parallel/batched TiKV append paths, multi-chunk posting management, and related benchmark configuration.
Changes:
- Adds batched/priority append logic and TiKV multi-merge/count-cache handling for SPANN dynamic postings.
- Updates benchmark/test configuration flow for TiKV, parallel BKT build, and search-during-insert threads.
- Adjusts helper infrastructure including thread-pool front insertion, TiKV constructor parameters, and benchmark artifacts/config files.
Reviewed changes
Copilot reviewed 11 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
Test/src/VersionMapTest.cpp |
Updates TiKV test DB construction for the new TiKVIO constructor. |
Test/src/SPFreshTest.cpp |
Reworks benchmark config propagation and thread parameters. |
Test/src/main.cpp |
Guards Abseil deadlock-detection setup behind TIKV. |
evaluation/2026-05-08-merged_spfresh/benchmark.ini |
Adds a TiKV benchmark configuration. |
benchmark.ini |
Removes the previous root benchmark config. |
AnnService/src/Core/SPANN/SPANNIndex.cpp |
Updates head ID loading, head build parameters, and TiKV DB construction. |
AnnService/src/Core/SPANN/ExtraFileController.cpp |
Adjusts IO thread calculation for layered builds. |
AnnService/inc/Helper/ThreadPool.h |
Switches job storage to deque and adds front insertion. |
AnnService/inc/Helper/KeyValueIO.h |
Adds default MultiMerge and async stats hook. |
AnnService/inc/Core/SPANN/ExtraTiKVController.h |
Moves posting count cache into TiKV IO and adds batched/multi-chunk merge paths. |
AnnService/inc/Core/SPANN/ExtraDynamicSearcher.h |
Adds append jobs, batched append, and refactors posting DB access through KeyValueIO. |
.vscode/launch.json |
Updates ASAN preload path to GCC 13. |
.gitignore |
Broadens ignored perftest artifacts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| for (auto& kv : headAppends) |
| std::vector<int> fetchedCounts; | ||
| AsyncGetPostingCounts(keys, &fetchedCounts, | ||
| std::chrono::microseconds(5000000)); | ||
|
|
||
| auto batch = std::make_shared<TiKVIO::AsyncBatch>(); | ||
| batch->Add(static_cast<int>(keys.size())); | ||
|
|
||
| // std::vector<std::atomic<int>> is non-resizable but in-place | ||
| // constructible to size N with zero-initialization. | ||
| std::vector<std::atomic<int>> okFlags(keys.size()); | ||
| for (auto& f : okFlags) f.store(0, std::memory_order_relaxed); | ||
|
|
||
| for (size_t i = 0; i < keys.size(); i++) { | ||
| int newCount = fetchedCounts[i] + static_cast<int>(values[i].size()); | ||
| AsyncAppendChunkAndUpdateCount( | ||
| keys[i], values[i], newCount, | ||
| batch, &okFlags[i], MaxTimeout); |
| { static std::atomic<int> _logOnce{0}; if (_logOnce.fetch_add(1) == 0) SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "[PATH] Append using SINGLE-KEY Get+Put path (no multi-chunk)\n"); } | ||
| std::string fullPosting; | ||
| auto ret = Get(key, &fullPosting, MaxTimeout, reqs); | ||
| if (ret != ErrorCode::Success) fullPosting.clear(); |
| DistCalcMethod distMethod = iniReader.GetParameter("Benchmark", "DistMethod", DistCalcMethod::L2); | ||
| bool rebuild = iniReader.GetParameter("Benchmark", "Rebuild", true); | ||
| bool rebuildSsdOnly = iniReader.GetParameter("Benchmark", "RebuildSSDOnly", false); | ||
| bool rebuild = (iniReader.GetParameter("Benchmark", "Rebuild", true) || iniReader.GetParameter("Benchmark", "RebuildSSDOnly", false)); |
| std::vector<int> fetchedCounts; | ||
| AsyncGetPostingCounts(keys, &fetchedCounts, | ||
| std::chrono::microseconds(5000000)); | ||
|
|
||
| auto batch = std::make_shared<TiKVIO::AsyncBatch>(); | ||
| batch->Add(static_cast<int>(keys.size())); | ||
|
|
||
| // std::vector<std::atomic<int>> is non-resizable but in-place | ||
| // constructible to size N with zero-initialization. | ||
| std::vector<std::atomic<int>> okFlags(keys.size()); | ||
| for (auto& f : okFlags) f.store(0, std::memory_order_relaxed); | ||
|
|
||
| for (size_t i = 0; i < keys.size(); i++) { | ||
| int newCount = fetchedCounts[i] + static_cast<int>(values[i].size()); | ||
| AsyncAppendChunkAndUpdateCount( |
| if (!hasHead) { | ||
| SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "CheckCentroid cannot find head in posting! pid:%d, where:%s\n", pid, where.c_str()); | ||
| exit(-1); | ||
| } |
| int count = static_cast<int>(value.size()); | ||
| auto countRet = SetPostingCount(key, count, timeout); | ||
| if (countRet != ErrorCode::Success) { | ||
| SPTAGLIB_LOG(Helper::LogLevel::LL_Warning, "PutPostingToDB: SetPostingCount failed for key %d (data written OK)\n", key); | ||
| } | ||
| if (m_postingCountCache) m_postingCountCache->Put(key, count); | ||
| return ErrorCode::Success; |
| if (!hasHead) { | ||
| SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "CheckCentroid cannot find head in posting! pid:%d, where:%s\n", pid, where.c_str()); | ||
| exit(-1); |
| std::error_code ec; | ||
| std::string prevHeadVectorFile = m_opt->m_indexDirectory + FolderSep + m_opt->m_headIndexFolder + FolderSep + p_headIndex->GetParameter("VectorFilePath"); | ||
| std::string curHeadVectorFile = m_opt->m_indexDirectory + FolderSep + m_opt->m_headVectorFile; | ||
| std::filesystem::copy_file(prevHeadVectorFile, curHeadVectorFile, std::filesystem::copy_options::overwrite_existing, ec); |
| struct ShardData { | ||
| std::list<std::pair<SizeType, int>> order; // front = MRU | ||
| std::unordered_map<SizeType, std::list<std::pair<SizeType, int>>::iterator> map; |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
|
{ |
| { static std::atomic<int> _logOnce{0}; if (_logOnce.fetch_add(1) == 0) SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "[PATH] Append using SINGLE-KEY Get+Put path (no multi-chunk)\n"); } | ||
| std::string fullPosting; | ||
| auto ret = Get(key, &fullPosting, MaxTimeout, reqs); | ||
| if (ret == ErrorCode::KeyNotFound) { |
| DistCalcMethod distMethod = iniReader.GetParameter("Benchmark", "DistMethod", DistCalcMethod::L2); | ||
| bool rebuild = iniReader.GetParameter("Benchmark", "Rebuild", true); | ||
| bool rebuildSsdOnly = iniReader.GetParameter("Benchmark", "RebuildSSDOnly", false); | ||
| bool rebuild = (iniReader.GetParameter("Benchmark", "Rebuild", true) || iniReader.GetParameter("Benchmark", "RebuildSSDOnly", false)); |
| if (!hasHead) { | ||
| SPTAGLIB_LOG(Helper::LogLevel::LL_Error, "CheckCentroid cannot find head in posting! pid:%d, where:%s\n", pid, where.c_str()); | ||
| exit(-1); |
| SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "Loading headIDFile for layer %d...\n", currentLayer - 1); | ||
| std::shared_ptr<Helper::DiskIO> ptr = SPTAG::f_createIO(); | ||
| if (ptr == nullptr || | ||
| !ptr->Initialize((m_options.m_indexDirectory + FolderSep + m_options.m_headIDFile).c_str(), | ||
| std::ios::binary | std::ios::in)) | ||
| { | ||
| SPTAGLIB_LOG(Helper::LogLevel::LL_Info, "No headIDFile file:%s\n", | ||
| (m_options.m_indexDirectory + FolderSep + m_options.m_headIDFile).c_str()); | ||
| } | ||
| else { | ||
| localToGlobalID.Load(ptr, this->m_iDataBlockSize, this->m_iDataCapacity); |
…thub.com/microsoft/SPTAG into users/cheqi/pre-merge-tikv-parallelbuild
No description provided.