Lock-free scheduler with per-priority queues#193
Open
TrentHouliston wants to merge 3 commits into
Open
Conversation
…ce fixes Rework Scheduler/Pool/Group to avoid the global mutex contention that showed up under load. Tasks are now placed into one of five lock-free queues (one per priority bucket) so producers and consumers no longer serialise on a single lock, and Group acquisition uses a counting-semaphore fast path with a lock-free waiter queue per priority. Pools whose concurrency is one (MainThread, TraceController, etc.) now use a specialised MPSC queue with a non-atomic consumer side. Tasks within a Sync group retain strict in-order delivery; equal-priority tasks across pools see relaxed global FIFO. The new lock-free primitives (TaskQueue, MPSCQueue, Semaphore) and the Group counting-lock are covered by Catch2 BDD-style tests plus a scheduler benchmark. During TSAN validation across macOS clang and Linux gcc 13 three pre-existing data races were uncovered and fixed: * IOController (POSIX): the IOFinished handler mutated watches[].events under tasks_mutex while the poll thread read the same field from inside ::poll() under notifier.mutex. The bump() call closed the wake window but released notifier.mutex before the mutation, leaving the race. Inline the wake and hold notifier.mutex across the watches mutation and follow-up fire_event. * Scheduler::submit: the cached Pool pointer on Reaction (scheduler_data) was read/written from any submitting thread without synchronisation. Switch the cache to std::atomic_load/store on the shared_ptr; the worst case is two submitters racing both compute the identical pool and last-writer-wins. * Watchdog data store: the service time_point was read by the chrono controller while being mutated by user threads emitting a service event, and the void specialisation returned a reference through a temporary shared_ptr. Centralise reads/writes through a per-(WatchdogGroup, RuntimeType) std::mutex, return the time_point by value, and route WatchdogServicer::service through WatchdogDataStore::service so writes share the read mutex. Validation: * macOS clang TSAN: dsl/IO, dsl/Inline, dsl/Watchdog 30/30 clean each; full suite 63/63. * Linux gcc 13 TSAN: same three tests 30/30 clean; 16 hot-path tests x 3 runs serially with no TSAN warnings. * macOS Release: 64/64. Also ignore build-*/ directories so out-of-tree build folders don't show up in git status. Co-authored-by: Cursor <cursoragent@cursor.com>
The Reaction::scheduler_data cache previously held an std::shared_ptr<void>
read/written via std::atomic_load/atomic_store. On libstdc++ those fall back
to a small global pool of mutexes (selected by pointer hash), which becomes
a contention point on hot submission paths.
Change scheduler_data to std::atomic<void*>{nullptr}. Pools live for the
lifetime of the Scheduler and the PowerPlant tears reactors down before the
scheduler, so a non-owning raw pointer is safe. Group::try_submit and
WaitEntry are switched to Pool* accordingly, and the Scheduler field
declaration order is changed so that pools outlive groups on destruction.
Also fix the clang-tidy errors that were blocking the lint job: switch the
queue Slot/Block backing storage to std::array (avoid-c-arrays, member
init), explicit-base BLOCK_SIZE, do-while -> while, use auto with new,
RunningLock special members, Semaphore destructor, missing direct includes,
unused-and-kept includes, and a couple of small test cleanups
(reserve before emplace, explicit lvalue MPSCQueue enqueue overload to
work around an MSVC overload-resolution quirk on int(i)).
Co-authored-by: Cursor <cursoragent@cursor.com>
There was a problem hiding this comment.
Pull request overview
Reworks the Scheduler / Pool / Group machinery to remove global mutex contention by introducing per-priority lock-free task queues, an MPSC specialisation for single-consumer pools, and a counting-semaphore fast path for single-Sync groups. The change set also fixes three pre-existing TSAN races (IO poll/pollfd::events, Reaction::scheduler_data cache, and Watchdog data store) and adds BDD-style unit tests plus a scheduler microbenchmark.
Changes:
- New lock-free primitives (
Queue/TaskQueue/MPSCQueue/Semaphore/Priority) and aPoolthat owns five priority-bucketed queues with off-mutex idle bookkeeping. Groupgains a lock-free fast path (signedtokens+ per-prioritywait_bucketsofWaitEntry) and aRunningLock, with the existing mutex-based path retained for multi-group submissions.- TSAN race fixes in
IOController_Posix.ipp,Scheduler::submit(atomicscheduler_dataraw pointer), andWatchdogDataStore(mutex-protected reads/writes,getreturns by value, newserviceentry point).
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| .gitignore | Ignore build-*/ out-of-tree build directories. |
| src/extension/IOController_Posix.ipp | Hold notifier.mutex across watches[].events mutation to close TSAN race with the poll thread. |
| src/dsl/word/Watchdog.hpp | Add per-store mutex; get returns by value; new service static. |
| src/dsl/word/emit/Watchdog.hpp | Delegate WatchdogServicer::service to the mutex-protected WatchdogDataStore::service. |
| src/threading/Reaction.hpp | Change scheduler_data to std::atomic<void*> to avoid shared_ptr atomic-load contention. |
| src/threading/scheduler/Scheduler.{hpp,cpp} | Switch cached pool to raw pointer; reorder members so groups destroy before pools; fast-path single-group submit. |
| src/threading/scheduler/Pool.{hpp,cpp} | Replace single mutex-protected queue with per-priority buckets (MPMC or MPSC), atomic accept/pending_tasks/external_waiters, external waiter tracking. |
| src/threading/scheduler/Group.{hpp,cpp} | Add try_submit/try_acquire_running_lock lock-free fast path with signed tokens, slow_pending, wait_buckets, and RunningLock. |
| src/threading/scheduler/queue/{Queue,TaskQueue,MPSCQueue,Semaphore,Priority}.hpp | New lock-free queue interface and concrete MPMC/MPSC implementations, counting semaphore, priority bucket helper. |
| tests/tests/threading/{TaskQueue,MPSCQueue,Semaphore,Group}.cpp | BDD-style unit tests for the new primitives; Group test gains a forward-declaration include. |
| tests/tests/Benchmark.cpp | New scheduler microbenchmark over no-sync / single-sync / two-sync configurations. |
| tests/tests/util/serialise/xxhash.cpp | Add missing <cstring> include. |
Comment on lines
+101
to
+107
| bool link_next_block(Block* block) { | ||
| Block* expected = nullptr; | ||
| if (block->next.compare_exchange_strong(expected, allocate_block(), std::memory_order_acq_rel)) { | ||
| return true; | ||
| } | ||
| return expected != nullptr; | ||
| } |
Comment on lines
+104
to
+112
| bool link_next_block(Block* block) { | ||
| Block* expected = nullptr; | ||
| if (block->next.compare_exchange_strong(expected, | ||
| allocate_block(), | ||
| std::memory_order_acq_rel)) { | ||
| return true; | ||
| } | ||
| return expected != nullptr; | ||
| } |
Comment on lines
+181
to
+198
| /** | ||
| * Try to submit a task through the lock-free fast path. | ||
| * | ||
| * If a group token is available the task is submitted to the pool immediately. | ||
| * Otherwise the task is queued until a token is released. | ||
| * | ||
| * @param task the reaction task to submit | ||
| * @param pool the pool to submit to when runnable | ||
| * @param clear_idle if true, clear idle state on submission | ||
| * | ||
| * @return true if the task was submitted immediately | ||
| */ | ||
| /** | ||
| * Try to acquire a token for inline execution without submitting to a pool. | ||
| * | ||
| * @return an RAII lock if a token was acquired, otherwise nullptr | ||
| */ | ||
| std::unique_ptr<Lock> try_acquire_running_lock(); |
Comment on lines
+189
to
+206
| const std::size_t bucket = queue::priority_index(task->priority); | ||
| pool->register_external_waiter(); | ||
| wait_buckets[bucket].enqueue(WaitEntry{std::move(task), pool, clear_idle}); | ||
|
|
||
| // Reserve a slot in the signed counter; if a token was still available, run a waiter now. | ||
| const int prev = tokens.fetch_sub(1, std::memory_order_acq_rel); | ||
| if (prev > 0) { | ||
| if (slow_pending.load(std::memory_order_acquire) > 0) { | ||
| // Hand the token back so the slow path can pick it up. | ||
| release_token(); | ||
| } | ||
| else { | ||
| drain_one_to_pool(); | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Rework
Scheduler/Pool/Groupto remove the global mutex contention that showed up on the existing implementation under load:Poolnow owns five independent MPMCTaskQueues (one per priority bucket) instead of a single mutex-protected priority queue. Submitting and dequeuing on different priorities no longer contend at all, and same-priority submits use a block-based lock-free FIFO with a graveyard-style block reclamation scheme to avoid ABA.concurrency == 1(e.g.MainThread,TraceController) now constructMPSCQueue<Task>instead ofTaskQueue<Task>, giving a non-atomic consumer side and removing the dequeue CAS entirely for those pools. Both queues sit behind aQueue<T>polymorphic interface soPoolcan hold astd::array<std::unique_ptr<Queue<Task>>>of buckets and dispatch the right type at construction.Syncgroups now acquire viastd::atomic<int> tokens+ a lock-freeTaskQueue<WaitEntry>per priority for waiters, falling back to the mutex-backedGroupLockonly for multi-group submissions. Sync ordering inside a group is strict; equal-priority tasks across pools see relaxed global FIFO.New lock-free primitives (
TaskQueue,MPSCQueue,Semaphore) andGroup::CountingLockare covered by Catch2 BDD-style tests, plus a scheduler microbenchmark (tests/tests/Benchmark.cpp).TSAN race fixes (pre-existing)
While validating the changes under TSAN on macOS clang and Linux gcc 13 three pre-existing data races surfaced and are fixed in this PR:
IOController_Posix.ipp—pollfd::events. TheIOFinishedhandler mutatedwatches[].eventswhile only holdingtasks_mutex, and the poll thread read the same field from inside::poll()while only holdingnotifier.mutex.bump()woke poll but releasednotifier.mutexbefore the mutation, leaving the race window open. The handler now writes the wake byte inline and holdsnotifier.mutexacross thewatchesupdate and the follow-upfire_eventcall (which can also touchwatches[].events).Scheduler::submit—Reaction::scheduler_data. The cached poolshared_ptr<void>was read/written from any submitting thread without synchronisation. Switched tostd::atomic_load_explicit/std::atomic_store_explicit(acquire/release). All racers compute the same pool, so last-writer-wins is benign.Watchdogdata store. The servicetime_pointwas read by the chrono controller while being mutated by user threads emitting a service event, and the void specialisation returned a reference through a temporaryshared_ptr(latent dangling reference). Centralised reads/writes through a per-(WatchdogGroup, RuntimeType)std::mutex, madegetreturn by value, and routedWatchdogServicer::servicethroughWatchdogDataStore::serviceso writes share the read mutex.Other
.gitignorenow matchesbuild-*/so out-of-tree TSAN/ASAN/Release build directories don't appear ingit status.Test plan
dsl/IO,dsl/Inline,dsl/Watchdog30/30 clean each; full suite 63/63 ✓api/TimeTravelflaked once under parallel TSAN load, 10/10 passes in isolation — pre-existing timing flakiness, no race)tests/threading/{TaskQueue,MPSCQueue,Semaphore,CountingLock,Group})Made with Cursor