diff --git a/.gitignore b/.gitignore index 7deac2fd..40b36099 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ # Build & CMake files build/ +build-*/ CMakeCache.txt CMakeFiles Makefile diff --git a/src/dsl/word/Watchdog.hpp b/src/dsl/word/Watchdog.hpp index 9d2fab96..f2b2ac7a 100644 --- a/src/dsl/word/Watchdog.hpp +++ b/src/dsl/word/Watchdog.hpp @@ -23,6 +23,7 @@ #ifndef NUCLEAR_DSL_WORD_WATCHDOG_HPP #define NUCLEAR_DSL_WORD_WATCHDOG_HPP +#include #include #include "../../threading/Reaction.hpp" @@ -52,12 +53,25 @@ namespace dsl { using MapType = std::remove_cv_t; using WatchdogStore = util::TypeMap>; + /** + * Mutex protecting structural and value updates to the underlying map for this + * (WatchdogGroup, RuntimeType) pair. Watchdog timers are read by the chrono controller + * thread (via @ref get) while being written by user threads that emit a service event + * (via @ref service), and the underlying std::map is also mutated by init/unbind, so a + * single shared mutex serialises all of those operations. + */ + static std::mutex& mutex() { + static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + return m; + } + /** * Ensures the data store is initialised correctly. * * @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group */ static void init(const RuntimeType& data) { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() == nullptr) { WatchdogStore::set(std::make_shared>()); } @@ -67,11 +81,15 @@ namespace dsl { } /** - * Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog + * Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog. + * + * Returned by value so the caller never holds a reference into the (mutex-protected) + * map. The time_point is small and trivially copyable so the copy is essentially free. * * @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group */ - static const NUClear::clock::time_point& get(const RuntimeType& data) { + static NUClear::clock::time_point get(const RuntimeType& data) { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) { throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", " + util::demangle(typeid(MapType).name()) @@ -80,12 +98,29 @@ namespace dsl { return WatchdogStore::get()->at(data); } + /** + * Atomically updates the service time for the WatchdogGroup/RuntimeType/data watchdog. + * + * Called by @ref emit::WatchdogServicer::service to keep the write under the same + * mutex that @ref get uses for reads. + */ + static void service(const RuntimeType& data, const NUClear::clock::time_point& when) { + const std::lock_guard lock(mutex()); + if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) { + throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", " + + util::demangle(typeid(MapType).name()) + + "> has not been created yet or no watchdog has been set up"); + } + WatchdogStore::get()->at(data) = when; + } + /** * Cleans up any allocated storage for the WatchdogGroup/RuntimeType/data watchdog * * @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group */ static void unbind(const RuntimeType& data) { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() != nullptr) { WatchdogStore::get()->erase(data); } @@ -105,10 +140,17 @@ namespace dsl { struct WatchdogDataStore { using WatchdogStore = util::TypeMap; + /// See the documentation on the runtime-arg specialisation. + static std::mutex& mutex() { + static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + return m; + } + /** * Ensures the data store is initialised correctly. */ static void init() { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() == nullptr) { WatchdogStore::set(std::make_shared(NUClear::clock::now())); } @@ -116,8 +158,12 @@ namespace dsl { /** * Gets the current service time for the WatchdogGroup watchdog. + * + * Returned by value so the caller never reads from the time_point while it is being + * mutated by @ref service on another thread. */ - static const NUClear::clock::time_point& get() { + static NUClear::clock::time_point get() { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() == nullptr) { throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + "> is trying to field a service call for an unknown data type"); @@ -125,10 +171,23 @@ namespace dsl { return *WatchdogStore::get(); } + /** + * Atomically updates the service time for the WatchdogGroup watchdog. + */ + static void service(const NUClear::clock::time_point& when) { + const std::lock_guard lock(mutex()); + if (WatchdogStore::get() == nullptr) { + throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + + "> has not been created yet or no watchdog has been set up"); + } + *WatchdogStore::get() = when; + } + /** * Cleans up any allocated storage for the WatchdogGroup watchdog. */ static void unbind() { + const std::lock_guard lock(mutex()); if (WatchdogStore::get() != nullptr) { WatchdogStore::get().reset(); } diff --git a/src/dsl/word/emit/Watchdog.hpp b/src/dsl/word/emit/Watchdog.hpp index f5754ad0..f6c37c2a 100644 --- a/src/dsl/word/emit/Watchdog.hpp +++ b/src/dsl/word/emit/Watchdog.hpp @@ -23,11 +23,8 @@ #ifndef NUCLEAR_DSL_WORD_EMIT_WATCHDOG_HPP #define NUCLEAR_DSL_WORD_EMIT_WATCHDOG_HPP -#include - #include "../../../PowerPlant.hpp" -#include "../../../util/TypeMap.hpp" -#include "../../../util/demangle.hpp" +#include "../Watchdog.hpp" namespace NUClear { namespace dsl { @@ -47,8 +44,6 @@ namespace dsl { template struct WatchdogServicer { using MapType = std::remove_cv_t; - using WatchdogStore = - util::TypeMap>; /** * Construct a new Watchdog Servicer object @@ -63,18 +58,14 @@ namespace dsl { explicit WatchdogServicer(const RuntimeType& data) : data(data) {} /** - * Services the watchdog + * Services the watchdog. * - * The watchdog timer that is specified by the WatchdogGroup/RuntimeType/data combination will have its - * service time updated to whatever is stored in when. + * Delegates to @ref word::WatchdogDataStore::service so the write happens under the + * same mutex that guards reads in the chrono controller; otherwise the time_point + * would be torn-read / torn-written across threads. */ void service() { - if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) { - throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", " - + util::demangle(typeid(RuntimeType).name()) - + "> has not been created yet or no watchdog has been set up"); - } - WatchdogStore::get()->at(data) = when; + word::WatchdogDataStore::service(data, when); } private: @@ -94,19 +85,15 @@ namespace dsl { */ template struct WatchdogServicer { - using WatchdogStore = util::TypeMap; /** - * Services the watchdog + * Services the watchdog. * - * The watchdog timer for WatchdogGroup will have its service time updated to whatever is stored in when + * Delegates to @ref word::WatchdogDataStore::service so the write happens under the + * same mutex that guards reads in the chrono controller. */ void service() { - if (WatchdogStore::get() == nullptr) { - throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) - + "> has not been created yet or no watchdog has been set up"); - } - WatchdogStore::set(std::make_shared(when)); + word::WatchdogDataStore::service(when); } private: diff --git a/src/extension/IOController_Posix.ipp b/src/extension/IOController_Posix.ipp index 13ff8bac..5dcb1f10 100644 --- a/src/extension/IOController_Posix.ipp +++ b/src/extension/IOController_Posix.ipp @@ -207,8 +207,20 @@ namespace extension { tasks.erase(task); } else { - // Make sure poll isn't currently waiting for an event to happen - bump(); + // We are about to mutate `watches[].events`, which the poll thread reads + // from inside ::poll(). Write to the notify pipe to kick poll out, then + // hold notifier.mutex for the duration of the mutation so the poll thread + // cannot re-enter ::poll() against a half-updated entry. This is the same + // wake-then-lock pattern bump() uses, but we keep the lock held until the + // watches update (and the follow-up fire_event, which can also touch + // watches[].events) is finished. + uint8_t val = 1; + if (::write(notifier.send, &val, sizeof(val)) < 0) { + throw std::system_error(network_errno, + std::system_category(), + "There was an error while writing to the notification pipe"); + } + const std::lock_guard notifier_lock(notifier.mutex); // Unmask the events that were just processed auto it = std::lower_bound(watches.begin(), diff --git a/src/threading/Reaction.hpp b/src/threading/Reaction.hpp index 6372d101..f6d2bf91 100644 --- a/src/threading/Reaction.hpp +++ b/src/threading/Reaction.hpp @@ -135,8 +135,19 @@ namespace threading { /// The callback generator function (creates databound callbacks) TaskGenerator generator; - /// Cached data for this reaction added by the scheduler - std::shared_ptr scheduler_data; + /// Cached scheduler-private pointer for this reaction. + /// + /// The scheduler uses this as a fast-path cache for the resolved pool that this reaction's + /// tasks should run on. It is a raw, non-owning `void*` rather than `std::shared_ptr` + /// to avoid the per-submit cost of `std::atomic_load`/`atomic_store` on a `shared_ptr`, + /// which on libstdc++ falls back to a small global pool of mutexes (selected by pointer + /// hash) and can become a contention point on hot submission paths. + /// + /// Ownership of whatever this points at lives entirely with the scheduler; reactions + /// outlive scheduler-side resources because PowerPlant tears reactors down before the + /// scheduler. The cache is set-once: the first submit resolves the pool and CASes it in, + /// subsequent submits just load it. + std::atomic scheduler_data{nullptr}; friend class scheduler::Scheduler; /// Let the scheduler mess with reaction objects }; diff --git a/src/threading/scheduler/Group.cpp b/src/threading/scheduler/Group.cpp index c24522be..4863da35 100644 --- a/src/threading/scheduler/Group.cpp +++ b/src/threading/scheduler/Group.cpp @@ -22,6 +22,8 @@ #include "Group.hpp" #include +#include +#include #include #include #include @@ -30,7 +32,10 @@ #include "../../id.hpp" #include "../../util/GroupDescriptor.hpp" +#include "../ReactionTask.hpp" #include "Lock.hpp" +#include "Pool.hpp" +#include "queue/Priority.hpp" namespace NUClear { namespace threading { @@ -39,36 +44,44 @@ namespace threading { Group::LockHandle::LockHandle(const NUClear::id_t& task_id, const int& priority, std::function notify) : task_id(task_id), priority(priority), notify(std::move(notify)) {} + Group::RunningLock::RunningLock(Group& group, std::shared_ptr group_keepalive) + : group(group), keepalive(std::move(group_keepalive)) {} + + Group::RunningLock::~RunningLock() { + group.release_token(); + } + + bool Group::RunningLock::lock() { + return true; + } + Group::GroupLock::GroupLock(Group& group, std::shared_ptr handle) : group(group), handle(std::move(handle)) {} Group::GroupLock::~GroupLock() { - // The notify targets may be trying to lock the group - // If we try to notify them while holding the lock ourself we will deadlock - // So extract the notify targets and notify them after we release the lock std::vector> to_notify; + bool removed_from_queue = false; + int prev_tokens = 0; + bool was_locked = false; /*mutex scope*/ { const std::lock_guard lock(group.mutex); - // Free the token if we held one if (handle->locked) { handle->locked = false; - group.tokens++; + prev_tokens = group.tokens.fetch_add(1, std::memory_order_acq_rel); + was_locked = true; } - // Remove ourself from the queue auto it = std::find(group.queue.begin(), group.queue.end(), handle); if (it != group.queue.end()) { group.queue.erase(it); + removed_from_queue = true; } - // Notify any tasks that can lock and hasn't been notified - int free_tokens = group.tokens; + int free_tokens = group.tokens.load(std::memory_order_relaxed); for (const auto& h : group.queue) { - // Unlocked tasks would consume a token free_tokens -= h->locked ? 0 : 1; - // Any tasks that are not locked and have not been notified should be notified if (free_tokens >= 0 && !h->locked && !h->notified) { h->notified = true; to_notify.push_back(h); @@ -76,32 +89,58 @@ namespace threading { } } - // Notify all the tasks that can now lock + if (removed_from_queue) { + group.slow_pending.fetch_sub(1, std::memory_order_acq_rel); + } + for (const auto& h : to_notify) { h->notify(); } + + // If a fast-path waiter was queued (tokens were already negative before our release), + // drain one waiter to claim the slot we just freed. + if (was_locked && prev_tokens < 0) { + group.drain_one_to_pool(); + return; + } + + // Otherwise: no fast waiter was directly entitled. If slow_pending is now 0 and a + // token is available, give it to any fast waiter we have so they don't get stranded. + if (was_locked && group.slow_pending.load(std::memory_order_acquire) == 0) { + while (true) { + int expected = group.tokens.load(std::memory_order_acquire); + if (expected <= 0) { + break; + } + if (group.tokens.compare_exchange_weak(expected, + expected - 1, + std::memory_order_acq_rel)) { + if (!group.drain_one_to_pool()) { + group.tokens.fetch_add(1, std::memory_order_release); + } + break; + } + } + } } bool Group::GroupLock::lock() { - // If already locked then return true if (handle->locked) { return true; } const std::lock_guard lock(group.mutex); - int free = group.tokens; + int free = group.tokens.load(std::memory_order_relaxed); for (const auto& h : group.queue) { - // Unlocked tasks would consume a token free -= h->locked ? 0 : 1; - // Ran out of free tokens (the 0th token is the last one) if (free < 0) { return false; } if (h == handle) { handle->locked = true; - group.tokens--; + group.tokens.fetch_sub(1, std::memory_order_release); return true; } } @@ -109,7 +148,112 @@ namespace threading { return false; } - Group::Group(std::shared_ptr descriptor) : descriptor(std::move(descriptor)) {} + Group::Group(std::shared_ptr descriptor) + : descriptor(std::move(descriptor)), tokens(this->descriptor->concurrency) {} + + std::unique_ptr Group::try_acquire_running_lock() { + if (slow_pending.load(std::memory_order_acquire) > 0) { + return nullptr; + } + int expected = tokens.load(std::memory_order_acquire); + while (expected > 0) { + if (tokens.compare_exchange_weak(expected, expected - 1, std::memory_order_acq_rel)) { + if (slow_pending.load(std::memory_order_acquire) > 0) { + // A multi-group waiter slipped in; restore the token and back off. + release_token(); + return nullptr; + } + return make_running_lock(); + } + } + return nullptr; + } + + bool Group::try_submit(std::unique_ptr&& task, Pool* pool, const bool& clear_idle) { + // Don't jump ahead of multi-group waiters; if any exist, queue ourselves. + if (slow_pending.load(std::memory_order_acquire) == 0) { + int expected = tokens.load(std::memory_order_acquire); + while (expected > 0) { + if (tokens.compare_exchange_weak(expected, expected - 1, std::memory_order_acq_rel)) { + if (slow_pending.load(std::memory_order_acquire) > 0) { + // Restore the token and fall through to enqueueing. + release_token(); + break; + } + pool->submit({std::move(task), make_running_lock()}, clear_idle); + return true; + } + } + } + + const std::size_t bucket = queue::priority_index(task->priority); + pool->register_external_waiter(); + wait_buckets[bucket].enqueue(WaitEntry{std::move(task), pool, clear_idle}); + + // Reserve a slot in the signed counter; if a token was still available, run a waiter now. + const int prev = tokens.fetch_sub(1, std::memory_order_acq_rel); + if (prev > 0) { + if (slow_pending.load(std::memory_order_acquire) > 0) { + // Hand the token back so the slow path can pick it up. + release_token(); + } + else { + drain_one_to_pool(); + } + } + + return false; + } + + void Group::release_token() { + const int prev = tokens.fetch_add(1, std::memory_order_acq_rel); + + // If a slow-path waiter exists give them first chance. + if (slow_pending.load(std::memory_order_acquire) > 0) { + notify_slow_path(); + return; + } + + // A fast-path waiter has already decremented; hand them the slot. + if (prev < 0) { + drain_one_to_pool(); + } + } + + void Group::notify_slow_path() { + std::vector> to_notify; + /*mutex scope*/ { + const std::lock_guard lock(mutex); + int free_tokens = tokens.load(std::memory_order_relaxed); + for (const auto& h : queue) { + free_tokens -= h->locked ? 0 : 1; + if (free_tokens >= 0 && !h->locked && !h->notified) { + h->notified = true; + to_notify.push_back(h); + } + } + } + for (const auto& h : to_notify) { + h->notify(); + } + } + + bool Group::drain_one_to_pool() { + WaitEntry entry; + for (std::size_t bucket = 0; bucket < queue::PRIORITY_BUCKETS; ++bucket) { + if (wait_buckets[bucket].try_dequeue(entry)) { + Pool* pool = entry.pool; + pool->submit({std::move(entry.task), make_running_lock()}, entry.clear_idle, /*force=*/true); + pool->unregister_external_waiter(); + return true; + } + } + return false; + } + + std::unique_ptr Group::make_running_lock() { + return std::make_unique(*this, shared_from_this()); + } std::unique_ptr Group::lock(const NUClear::id_t& task_id, const int& priority, @@ -117,15 +261,13 @@ namespace threading { auto handle = std::make_shared(task_id, priority, notify); - // Insert sorted into the queue + slow_pending.fetch_add(1, std::memory_order_acq_rel); + const std::lock_guard lock(mutex); queue.insert(std::lower_bound(queue.begin(), queue.end(), handle), handle); - // Unnotify any tasks that are beyond the lock window - int free = tokens; + int free = tokens.load(std::memory_order_relaxed); for (const auto& h : queue) { - - // Unlocked tasks would consume a token free -= h->locked ? 0 : 1; if (free < 0) { h->notified = false; diff --git a/src/threading/scheduler/Group.hpp b/src/threading/scheduler/Group.hpp index 785b9da8..0ad6e1b6 100644 --- a/src/threading/scheduler/Group.hpp +++ b/src/threading/scheduler/Group.hpp @@ -22,6 +22,8 @@ #ifndef NUCLEAR_THREADING_SCHEDULER_GROUP_HPP #define NUCLEAR_THREADING_SCHEDULER_GROUP_HPP +#include +#include #include #include #include @@ -29,22 +31,39 @@ #include "../../util/GroupDescriptor.hpp" #include "Lock.hpp" +#include "queue/Priority.hpp" +#include "queue/TaskQueue.hpp" namespace NUClear { namespace threading { + + class ReactionTask; + namespace scheduler { + class Pool; + /** * A group is a collection of tasks which are mutually exclusive to each other. * * They are identified by having a common group id along with a maximum concurrency. * This class holds the structures that manage the group. * - * This class is used along with the GroupLock class to manage the group locking. + * Tasks submitted through the scheduler fast path use lock-free waiter buckets. + * The lock() API uses a mutex-protected sorted queue for multi-group and unit-test use. */ - class Group { + class Group : public std::enable_shared_from_this { private: + struct WaitEntry { + std::unique_ptr task; + /// Non-owning pointer; Pools live for the lifetime of the Scheduler and the + /// Scheduler tears down Groups before Pools, so it is always safe to dereference + /// while this WaitEntry is reachable. + Pool* pool{nullptr}; + bool clear_idle{false}; + }; + /** * A lock handle holds the shared state between the group object and the lock objects. * It holds if the lock should currently be locked, as well as ordering which locks should be locked first. @@ -87,6 +106,26 @@ namespace threading { std::function notify; }; + /** + * RAII lock released when a fast-path task finishes executing. + */ + class RunningLock : public Lock { + public: + RunningLock(Group& group, std::shared_ptr group_keepalive); + ~RunningLock() override; + + RunningLock(const RunningLock&) = delete; + RunningLock(RunningLock&&) = delete; + RunningLock& operator=(const RunningLock&) = delete; + RunningLock& operator=(RunningLock&&) = delete; + + bool lock() override; + + private: + Group& group; + std::shared_ptr keepalive; + }; + public: /** * A group lock is the RAII lock object that is used by the Pools to manage the group locking. @@ -139,6 +178,39 @@ namespace threading { */ explicit Group(std::shared_ptr descriptor); + /** + * Try to submit a task through the lock-free fast path. + * + * If a group token is available the task is submitted to the pool immediately. + * Otherwise the task is queued until a token is released. + * + * @param task the reaction task to submit + * @param pool the pool to submit to when runnable + * @param clear_idle if true, clear idle state on submission + * + * @return true if the task was submitted immediately + */ + /** + * Try to acquire a token for inline execution without submitting to a pool. + * + * @return an RAII lock if a token was acquired, otherwise nullptr + */ + std::unique_ptr try_acquire_running_lock(); + + /** + * Try to submit a task through the lock-free fast path. + * + * If a group token is available the task is submitted to the pool immediately. + * Otherwise the task is queued until a token is released. + * + * @param task the reaction task to submit + * @param pool the pool to submit to when runnable (non-owning; must outlive the call) + * @param clear_idle if true, clear idle state on submission + * + * @return true if the task was submitted immediately + */ + bool try_submit(std::unique_ptr&& task, Pool* pool, const bool& clear_idle); + /** * This function will create a new lock for the task and return it. * @@ -163,11 +235,21 @@ namespace threading { const std::shared_ptr descriptor; private: - /// The mutex which protects the queue + void release_token(); + void notify_slow_path(); + bool drain_one_to_pool(); + std::unique_ptr make_running_lock(); + + /// Available group tokens (signed when waiters are queued on the fast path) + std::atomic tokens; + /// Number of unsatisfied slow-path waiters + std::atomic slow_pending{0}; + /// Lock-free wait queues keyed by priority + std::array, queue::PRIORITY_BUCKETS> wait_buckets; + + /// The mutex which protects the slow-path queue std::mutex mutex; - /// The number of tokens that are available for this group - int tokens = descriptor->concurrency; - /// The queue of tasks for this specific thread pool and if they are group blocked + /// The queue of tasks for the slow path std::vector> queue; }; diff --git a/src/threading/scheduler/Pool.cpp b/src/threading/scheduler/Pool.cpp index 6b8bb537..361bcc51 100644 --- a/src/threading/scheduler/Pool.cpp +++ b/src/threading/scheduler/Pool.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -33,12 +34,14 @@ #include "../../dsl/word/MainThread.hpp" #include "../../dsl/word/Pool.hpp" #include "../../id.hpp" -#include "../../message/ReactionStatistics.hpp" #include "../../threading/Reaction.hpp" #include "../../util/Inline.hpp" #include "../ReactionTask.hpp" #include "CountingLock.hpp" #include "Scheduler.hpp" +#include "queue/MPSCQueue.hpp" +#include "queue/Priority.hpp" +#include "queue/TaskQueue.hpp" namespace NUClear { namespace threading { @@ -47,7 +50,21 @@ namespace threading { Pool::Pool(Scheduler& scheduler, std::shared_ptr descriptor) : descriptor(std::move(descriptor)), scheduler(scheduler) { - // Increase the number of active pools if this pool counts for idle but immediately be idle + // Pools declared with a single worker (e.g. MainThread, the Trace pool, any user pool with + // `concurrency = 1`) only ever have one consumer; use the lighter MPSC queue for them. + // Pools where the default-pool concurrency may differ from the descriptor's nominal value + // are conservatively given the MPMC queue. + const bool single_consumer = + this->descriptor->concurrency == 1 && this->descriptor != dsl::word::Pool<>::descriptor(); + for (auto& bucket : buckets) { + if (single_consumer) { + bucket = std::make_unique>(); + } + else { + bucket = std::make_unique>(); + } + } + if (this->descriptor->counts_for_idle) { scheduler.active_pools.fetch_add(1, std::memory_order_relaxed); pool_idle = std::make_unique(scheduler.active_pools); @@ -55,30 +72,21 @@ namespace threading { } Pool::~Pool() { - - // Force stop the pool threads and wait for them to finish stop(Pool::StopType::FORCE); join(); - - // One less active pool scheduler.active_pools.fetch_sub(descriptor->counts_for_idle ? 1 : 0, std::memory_order_relaxed); } void Pool::start() { - // Default thread pool gets its thread count from the configuration rather than the descriptor const int n_threads = descriptor == dsl::word::Pool<>::descriptor() ? scheduler.default_pool_concurrency : descriptor->concurrency; - // Set the number of active threads to the number of threads in the pool active = descriptor->counts_for_idle ? n_threads : 0; - // Main thread pool just executes run - // This assumes the thread calling start() is the main thread if (descriptor == dsl::word::MainThread::descriptor()) { run(); } else { - // Make n threads for the pool const std::lock_guard lock(mutex); for (int i = 0; i < n_threads; ++i) { threads.emplace_back(std::make_unique(&Pool::run, this)); @@ -89,19 +97,19 @@ namespace threading { void Pool::stop(const StopType& type) { const std::lock_guard lock(mutex); - live = true; // Live so the thread will wake from sleep - accept = descriptor->persistent; // Always accept if persistent otherwise stop + live = true; + accept.store(descriptor->persistent, std::memory_order_release); switch (type) { case StopType::NORMAL: { - running = descriptor->persistent; // Keep running if we persistent + running = descriptor->persistent; } break; case StopType::FINAL: { - running = false; // Always stop running on the final stop + running = false; } break; case StopType::FORCE: { - // Clear the queue and stop the pool immediately - queue.clear(); + drain_queues(); + pending_tasks.store(0, std::memory_order_relaxed); running = false; } break; } @@ -110,7 +118,6 @@ namespace threading { void Pool::notify(bool clear_idle) { const std::lock_guard lock(mutex); - /// May not be idle anymore, flag this before the thread wakes up live = true; if (clear_idle) { pool_idle = nullptr; @@ -119,7 +126,6 @@ namespace threading { } void Pool::join() const { - // Join all the threads for (const auto& thread : threads) { if (thread->joinable()) { thread->join(); @@ -127,35 +133,39 @@ namespace threading { } } - void Pool::submit(Task&& task, bool clear_idle) { - const std::lock_guard lock(mutex); - - // Not accepting new tasks - if (!accept) { + void Pool::submit(Task&& task, bool clear_idle, bool force) { + if (!force && !accept.load(std::memory_order_acquire)) { return; } - // Clear the global idle status if requested + const std::size_t bucket = queue::priority_index(task.task->priority); + buckets[bucket]->enqueue(std::move(task)); + pending_tasks.fetch_add(1, std::memory_order_release); + + const std::lock_guard lock(mutex); if (clear_idle) { pool_idle = nullptr; } - - // Insert in sorted order - queue.insert(std::lower_bound(queue.begin(), queue.end(), task), std::move(task)); - - // Pool might have something to do now live = true; - - // Notify a single thread that there is a new task condition.notify_one(); } + void Pool::register_external_waiter() { + external_waiters.fetch_add(1, std::memory_order_acq_rel); + } + + void Pool::unregister_external_waiter() { + if (external_waiters.fetch_sub(1, std::memory_order_acq_rel) == 1) { + // Wake any worker that may be parked specifically because external_waiters was > 0. + const std::lock_guard lock(mutex); + condition.notify_all(); + } + } + void Pool::add_idle_task(const std::shared_ptr& reaction) { const std::lock_guard lock(mutex); idle_tasks.push_back(reaction); - // If we previously had no idle tasks, it's possible every thread is sleeping (idle) - // Wake one up so that it can check again if (idle_tasks.size() == 1) { condition.notify_one(); } @@ -181,7 +191,6 @@ namespace threading { Pool::current_pool = this; try { while (true) { - // Run the next task Task task = get_task(); task.task->run(); } @@ -192,33 +201,61 @@ namespace threading { } } - Pool::Task Pool::get_task() { + bool Pool::try_dequeue_task(Task& out) { + for (std::size_t i = 0; i < queue::PRIORITY_BUCKETS; ++i) { + if (buckets[i]->try_dequeue(out)) { + pending_tasks.fetch_sub(1, std::memory_order_release); + return true; + } + } + return false; + } + + void Pool::drain_queues() { + Task discarded; + for (auto& bucket : buckets) { + while (bucket->try_dequeue(discarded)) { + } + } + } + Pool::Task Pool::get_task() { std::unique_lock lock(mutex); - while (running || !queue.empty()) { + while (running || pending_tasks.load(std::memory_order_acquire) > 0 + || external_waiters.load(std::memory_order_acquire) > 0) { + bool got = false; if (live) { - // Get the first task that can be run - for (auto it = queue.begin(); it != queue.end(); ++it) { - // If the task is not a group member, or we can get a token for the group then we can run it - if (it->lock == nullptr || it->lock->lock()) { - // If the task is not group blocked or we can lock the group then we can run it - Task task = std::move(*it); - queue.erase(it); - thread_idle[std::this_thread::get_id()] = nullptr; // This thread is no longer idle - pool_idle = nullptr; // The pool as a whole is no longer idle + Task task; + got = try_dequeue_task(task); + if (got) { + if (task.lock == nullptr || task.lock->lock()) { + thread_idle[std::this_thread::get_id()] = nullptr; + pool_idle = nullptr; return task; } + // The task was dequeued but its lock isn't acquirable. Re-enqueue and + // wait for someone to notify us when the lock state changes. + const std::size_t bucket = queue::priority_index(task.task->priority); + buckets[bucket]->enqueue(std::move(task)); + pending_tasks.fetch_add(1, std::memory_order_release); } } live = false; - auto idle_task = get_idle_task(); - if (idle_task.task != nullptr) { - return idle_task; + // Only account for idle when we genuinely found nothing; threads whose locks + // fail are not idle, they are blocked waiting for the lock state to change. + if (!got) { + auto idle_task = get_idle_task(); + if (idle_task.task != nullptr) { + return idle_task; + } } - // Wait for something to happen! - condition.wait(lock, [this] { return live || (!running && queue.empty()); }); + condition.wait(lock, [this] { + return live + || (!running && pending_tasks.load(std::memory_order_acquire) == 0 + && external_waiters.load(std::memory_order_acquire) == 0); + }); } condition.notify_all(); @@ -226,18 +263,14 @@ namespace threading { } Pool::Task Pool::get_idle_task() { - // Don't idle when shutting down, don't idle if we can't idle, don't idle if we are already idle if (!running || !descriptor->counts_for_idle) { return Task{}; } - // Tasks to be executed when idle std::vector> tasks; - /// Current local lock status auto& local_lock = thread_idle[std::this_thread::get_id()]; - // If not already idle, check to see if we are the last and if so add the local idle tasks if (local_lock == nullptr) { local_lock = std::make_unique(active); if (local_lock->lock()) { @@ -245,23 +278,19 @@ namespace threading { } } - // The if the pool is idle and does not have a global idle task, try the global lock - if (pool_idle == nullptr && active == 0) { + if (pool_idle == nullptr && active.load(std::memory_order_relaxed) == 0) { pool_idle = std::make_unique(scheduler.active_pools); - // This was the last pool to become idle, so get the global idle tasks if (pool_idle->lock()) { const std::lock_guard lock(scheduler.idle_mutex); tasks.insert(tasks.end(), scheduler.idle_tasks.begin(), scheduler.idle_tasks.end()); } } - // If there are no idle tasks, return no task if (tasks.empty()) { return Task{}; } - // Make a reaction task which will submit all the idle tasks to the scheduler auto task = std::make_unique( nullptr, true, @@ -271,7 +300,6 @@ namespace threading { [](const ReactionTask&) { return std::set>{}; }); task->callback = [this, t = std::move(tasks)](const ReactionTask& /*task*/) { for (const auto& idle_task : t) { - // Submit all the idle tasks to the scheduler scheduler.submit(idle_task->get_task()); } }; @@ -279,8 +307,6 @@ namespace threading { return Task{std::move(task)}; } - - // Initialise the current pool to nullptr if it is not already // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) thread_local Pool* Pool::current_pool = nullptr; diff --git a/src/threading/scheduler/Pool.hpp b/src/threading/scheduler/Pool.hpp index 68b5e40d..e4e574bd 100644 --- a/src/threading/scheduler/Pool.hpp +++ b/src/threading/scheduler/Pool.hpp @@ -22,6 +22,8 @@ #ifndef NUCLEAR_THREADING_SCHEDULER_POOL_HPP #define NUCLEAR_THREADING_SCHEDULER_POOL_HPP +#include +#include #include #include #include @@ -32,6 +34,10 @@ #include "../../util/ThreadPoolDescriptor.hpp" #include "../ReactionTask.hpp" #include "Lock.hpp" +#include "queue/MPSCQueue.hpp" +#include "queue/Priority.hpp" +#include "queue/Queue.hpp" +#include "queue/TaskQueue.hpp" namespace NUClear { namespace threading { @@ -138,8 +144,23 @@ namespace threading { * * @param task The reaction task task to submit * @param clear_idle If true, the idle state of the pool will be cleared + * @param force If true, submit even if the pool is no longer accepting new tasks + * (used when draining an already in-flight task from elsewhere, e.g. a Group) */ - void submit(Task&& task, bool clear_idle); + void submit(Task&& task, bool clear_idle, bool force = false); + + /** + * Register that a task is in flight outside the pool but will eventually be submitted to it. + * + * This keeps the pool's workers alive while there are tasks parked in another structure + * (e.g. a Group's waiter buckets) that point at this pool. + */ + void register_external_waiter(); + + /** + * Unregister a previously registered external waiter. + */ + void unregister_external_waiter(); /** * Add an idle task to this pool. @@ -198,6 +219,20 @@ namespace threading { */ Task get_task(); + /** + * Try to dequeue a runnable task from the priority buckets. + * + * @param out the task to fill if one is available + * + * @return true if a task was dequeued + */ + bool try_dequeue_task(Task& out); + + /** + * Drain all tasks from the priority buckets. + */ + void drain_queues(); + /** * Get an idle task to execute or hold. * @@ -217,17 +252,25 @@ namespace threading { /// If running is false this means the pool is shutting down and no more tasks will be accepted bool running = true; - /// If accept is false this pool will no longer accept new tasks - bool accept = true; + /// If accept is false this pool will no longer accept new tasks. + /// Atomic so that producers on the fast path can check it without taking the pool mutex. + std::atomic accept{true}; /// The threads which are running in this thread pool std::vector> threads; - /// The queue of tasks for this specific thread pool - std::vector queue; + /// Priority-bucketed task queues. Each bucket holds either an MPMC TaskQueue + /// (for pools with multiple worker threads) or an MPSCQueue (for pools that are + /// known to be single-consumer, e.g. MainThread or the Trace pool). The choice + /// is made at construction based on `descriptor->concurrency`. + std::array>, queue::PRIORITY_BUCKETS> buckets; + /// Number of tasks submitted but not yet dequeued + std::atomic pending_tasks{0}; + /// Number of tasks parked outside the pool (e.g. waiting on a Group token) that point at this pool + std::atomic external_waiters{0}; /// A boolean which is set to true when the queue is modified and set to false when there was no work to do bool live = true; - /// The mutex which protects the queue and idle tasks + /// The mutex which protects idle tasks and the live flag mutable std::mutex mutex; /// The condition variable which threads wait on if they can't get a task std::condition_variable condition; diff --git a/src/threading/scheduler/Scheduler.cpp b/src/threading/scheduler/Scheduler.cpp index 422001ce..c140d8e1 100644 --- a/src/threading/scheduler/Scheduler.cpp +++ b/src/threading/scheduler/Scheduler.cpp @@ -53,7 +53,7 @@ namespace threading { /*mutex scope*/ { const std::lock_guard lock(pools_mutex); - started = true; + started.store(true, std::memory_order_release); // Start all of the pools except the main thread pool for (const auto& pool : pools) { if (pool.first != dsl::word::MainThread::descriptor()) { @@ -141,7 +141,7 @@ namespace threading { // Don't start the main thread here, it will be started in the start function // If the scheduler has not yet started then don't start the threads for this pool yet - if (desc != dsl::word::MainThread::descriptor() && started) { + if (desc != dsl::word::MainThread::descriptor() && started.load(std::memory_order_acquire)) { pool->start(); } } @@ -162,7 +162,7 @@ namespace threading { std::unique_ptr Scheduler::get_groups_lock( const NUClear::id_t& task_id, const int& priority, - const std::shared_ptr& pool, + Pool* pool, const std::set>& descs) { // No groups @@ -188,35 +188,58 @@ namespace threading { return; } - // If we have run this task before, we know which pool it should be submitted to and cached it - // This avoids every single submit having to lock a mutex to find the pool - std::shared_ptr pool; + // Resolve the Pool for this task. + // + // The first submit for a reaction does a mutex-protected `get_pool()` lookup; the + // resulting pointer is then cached on the parent Reaction so subsequent submits skip + // the mutex entirely. + // + // The cache is a single `std::atomic` (see Reaction::scheduler_data). We + // deliberately avoid `std::atomic_load`/`atomic_store` on a `std::shared_ptr`: + // on libstdc++ those fall back to a small global pool of mutexes (~8 chosen by + // pointer hash) and become a contention point on hot submission paths. Pools live + // for the lifetime of the Scheduler (and the Scheduler tears down reactions before + // its own pools), so a non-owning raw pointer is safe. + // + // The cache update is benign-racing: two submitters that miss simultaneously will + // both call `get_pool()` and store the same pointer; last writer wins, identical + // value. + Pool* pool = nullptr; if (task->parent) { - if (task->parent->scheduler_data) { - pool = std::static_pointer_cast(task->parent->scheduler_data); - } - else { - pool = get_pool(task->pool_descriptor); - task->parent->scheduler_data = pool; + pool = static_cast(task->parent->scheduler_data.load(std::memory_order_acquire)); + if (pool == nullptr) { + pool = get_pool(task->pool_descriptor).get(); + task->parent->scheduler_data.store(static_cast(pool), std::memory_order_release); } } else { - pool = get_pool(task->pool_descriptor); + pool = get_pool(task->pool_descriptor).get(); + } + + const bool current_pool_idle = Pool::current() != nullptr && Pool::current()->is_idle(); + + // Fast path for a single group: lock-free token acquisition and waiter buckets + if (task->group_descriptors.size() == 1) { + const auto& group = get_group(*task->group_descriptors.begin()); + + if (task->run_inline) { + if (auto running_lock = group->try_acquire_running_lock()) { + task->run(); + return; + } + } + + group->try_submit(std::move(task), pool, !current_pool_idle); + return; } - // Get any locks that are required for this task + // Slow path for multiple groups: mutex-backed combined locks auto group_lock = get_groups_lock(task->id, task->priority, pool, task->group_descriptors); - // If this task should run immediately and not limited by the group lock if (task->run_inline && (group_lock == nullptr || group_lock->lock())) { task->run(); } else { - // Submit the task to the appropriate pool - // Clear the idle status only if the current pool is not idle - // This hands the job of managing global idle tasks to this other pool if we were about to do it - // That way the other pool can decide if it is idle or not - const bool current_pool_idle = Pool::current() != nullptr && Pool::current()->is_idle(); pool->submit({std::move(task), std::move(group_lock)}, !current_pool_idle); } } diff --git a/src/threading/scheduler/Scheduler.hpp b/src/threading/scheduler/Scheduler.hpp index 0c30970a..903c9624 100644 --- a/src/threading/scheduler/Scheduler.hpp +++ b/src/threading/scheduler/Scheduler.hpp @@ -127,7 +127,7 @@ namespace threading { */ std::unique_ptr get_groups_lock(const NUClear::id_t& task_id, const int& priority, - const std::shared_ptr& pool, + Pool* pool, const std::set>& descs); /// The number of threads that will be in the default thread pool @@ -136,10 +136,9 @@ namespace threading { /// If running is false this means the scheduler is shutting down and no new pools will be created std::atomic running{true}; - /// A mutex for when we are modifying groups - std::mutex groups_mutex; - /// A map of group ids to the number of active tasks currently running in that group - std::map, std::shared_ptr> groups; + // NB: `pools` is declared before `groups` so that on Scheduler destruction the groups + // (which may hold non-owning Pool* in their waiter buckets) are destroyed first, then + // the pools. This keeps the raw pointers in WaitEntry safe-by-construction. /// A mutex for when we are modifying pools std::mutex pools_mutex; @@ -147,7 +146,12 @@ namespace threading { std::map, std::shared_ptr> pools; /// If started is false pools will not be started until start is called /// once start is called future pools will be started immediately - bool started = false; + std::atomic started{false}; + + /// A mutex for when we are modifying groups + std::mutex groups_mutex; + /// A map of group ids to the number of active tasks currently running in that group + std::map, std::shared_ptr> groups; /// A mutex to protect the idle tasks list std::mutex idle_mutex; diff --git a/src/threading/scheduler/queue/MPSCQueue.hpp b/src/threading/scheduler/queue/MPSCQueue.hpp new file mode 100644 index 00000000..84a8cf19 --- /dev/null +++ b/src/threading/scheduler/queue/MPSCQueue.hpp @@ -0,0 +1,233 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_QUEUE_MPSC_QUEUE_HPP +#define NUCLEAR_THREADING_SCHEDULER_QUEUE_MPSC_QUEUE_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Queue.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + /** + * Lock-free multi-producer single-consumer unbounded FIFO queue. + * + * The producer side is identical to the MPMC TaskQueue (block-based, atomic + * fetch_add to claim a slot). The consumer side is simpler because there is + * by contract only ever one consumer thread: the per-block read counter is a + * plain integer, no CAS is needed to claim a slot, and the consumer can delete + * fully-drained blocks immediately (subject to letting concurrent producers + * finish touching them, handled via a graveyard like the MPMC variant). + * + * Use this in pools that are declared with `concurrency = 1` (e.g. MainThread, + * the TraceController pool, or any user pool with a single worker thread). + */ + template + class MPSCQueue : public Queue { + static_assert(std::is_move_constructible::value, "MPSCQueue requires move constructible T"); + + private: + static constexpr std::size_t BLOCK_SIZE = 64; + + struct Slot { + std::atomic committed{false}; + /// Raw aligned storage for the T payload. Left value-initialised (zeroed) so the + /// constructor fully covers all members; placement-new overwrites it on enqueue. + alignas(T) std::array storage{}; + }; + + struct Block { + std::array slots{}; + /// Producer claim counter, fetched by every enqueuer (atomic, MP-safe). + std::atomic write{0}; + /// Consumer read counter, only touched by the single consumer (non-atomic). + std::size_t read{0}; + std::atomic next{nullptr}; + Block* graveyard_next{nullptr}; + }; + + static T* slot_ptr(Slot& slot) { + return reinterpret_cast(slot.storage.data()); + } + + static Block* allocate_block() { + return new Block(); + } + + // Producers can still be operating on a block after the consumer advances head past + // it (e.g. a producer that loaded tail_block before it advanced is in + // link_next_block). To avoid use-after-free we never delete blocks while the queue + // is live; they are kept on a graveyard list and freed in the destructor. In steady + // state the graveyard length is bounded by the peak number of in-flight blocks. + void retire_block(Block* block) { + Block* head_graveyard = graveyard.load(std::memory_order_acquire); + while (true) { + block->graveyard_next = head_graveyard; + if (graveyard.compare_exchange_weak(head_graveyard, + block, + std::memory_order_release, + std::memory_order_relaxed)) { + return; + } + } + } + + bool link_next_block(Block* block) { + Block* expected = nullptr; + if (block->next.compare_exchange_strong(expected, + allocate_block(), + std::memory_order_acq_rel)) { + return true; + } + return expected != nullptr; + } + + void advance_tail(Block* expected, Block* next) { + Block* tail_ptr = tail_block.load(std::memory_order_acquire); + while (tail_ptr == expected) { + if (tail_block.compare_exchange_weak(tail_ptr, + next, + std::memory_order_release, + std::memory_order_relaxed)) { + return; + } + } + } + + /// Consumer-owned head pointer. Non-atomic because only the consumer reads/writes it. + Block* head_block; + /// Producer-shared tail pointer. Atomic because any number of producers chase it. + std::atomic tail_block; + /// Linked list of retired blocks that are kept alive until the queue is destroyed. + std::atomic graveyard; + + public: + MPSCQueue() { + auto* initial = new Block(); + head_block = initial; + tail_block.store(initial, std::memory_order_relaxed); + graveyard.store(nullptr, std::memory_order_relaxed); + } + + MPSCQueue(const MPSCQueue&) = delete; + MPSCQueue& operator=(const MPSCQueue&) = delete; + MPSCQueue(MPSCQueue&&) = delete; + MPSCQueue& operator=(MPSCQueue&&) = delete; + + ~MPSCQueue() override { + Block* current = head_block; + while (current != nullptr) { + Block* next = current->next.load(std::memory_order_relaxed); + delete current; + current = next; + } + + Block* dead = graveyard.load(std::memory_order_relaxed); + while (dead != nullptr) { + Block* next = dead->graveyard_next; + delete dead; + dead = next; + } + } + + void enqueue(const T& item) { + T copy(item); + enqueue(std::move(copy)); + } + + void enqueue(T&& item) override { + while (true) { + Block* block = tail_block.load(std::memory_order_acquire); + const std::size_t index = block->write.fetch_add(1, std::memory_order_relaxed); + + if (index < BLOCK_SIZE) { + Slot& slot = block->slots[index]; + new (slot.storage.data()) T(std::move(item)); + slot.committed.store(true, std::memory_order_release); + return; + } + + // Block full. Link the next one (or help an in-flight linker) and advance tail. + link_next_block(block); + + Block* next = block->next.load(std::memory_order_acquire); + advance_tail(block, next); + } + } + + bool try_dequeue(T& out) override { + while (true) { + const std::size_t write_observed = head_block->write.load(std::memory_order_acquire); + const std::size_t published = std::min(write_observed, static_cast(BLOCK_SIZE)); + + if (head_block->read < published) { + Slot& slot = head_block->slots[head_block->read]; + // Producer's claim happens-before its commit, but commit may not be visible + // yet if we raced it. Spin briefly until the data is published. + while (!slot.committed.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + out = std::move(*slot_ptr(slot)); + slot_ptr(slot)->~T(); + ++head_block->read; + return true; + } + + // Block drained from this consumer's perspective. Try to move to the next. + Block* next = head_block->next.load(std::memory_order_acquire); + if (next == nullptr) { + // If a producer has already overflowed past BLOCK_SIZE we know they're + // mid-way through linking the next block; wait briefly for it to appear. + if (write_observed > BLOCK_SIZE) { + std::this_thread::yield(); + continue; + } + return false; + } + + // We're the sole consumer so advancing head_block is a plain store. The old + // block goes to the graveyard so any producer that still holds a pointer to + // it (e.g. one mid-way through link_next_block) doesn't touch freed memory. + Block* old = head_block; + head_block = next; + retire_block(old); + } + } + }; + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_QUEUE_MPSC_QUEUE_HPP diff --git a/src/threading/scheduler/queue/Priority.hpp b/src/threading/scheduler/queue/Priority.hpp new file mode 100644 index 00000000..0d58b135 --- /dev/null +++ b/src/threading/scheduler/queue/Priority.hpp @@ -0,0 +1,66 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_QUEUE_PRIORITY_HPP +#define NUCLEAR_THREADING_SCHEDULER_QUEUE_PRIORITY_HPP + +#include +#include + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + /// Number of priority buckets (REALTIME, HIGH, NORMAL, LOW, IDLE). + static constexpr std::size_t PRIORITY_BUCKETS = 5; + + /** + * Map a reaction task priority value to a bucket index. + * + * Higher runtime priority maps to a lower index so buckets can be scanned from 0 upward. + * + * @param priority the task priority + * + * @return bucket index in [0, PRIORITY_BUCKETS) + */ + inline std::size_t priority_index(const int& priority) { + if (priority >= 1000) { + return 0; + } + if (priority >= 750) { + return 1; + } + if (priority >= 500) { + return 2; + } + if (priority >= 250) { + return 3; + } + return 4; + } + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_QUEUE_PRIORITY_HPP diff --git a/src/threading/scheduler/queue/Queue.hpp b/src/threading/scheduler/queue/Queue.hpp new file mode 100644 index 00000000..7966e2ab --- /dev/null +++ b/src/threading/scheduler/queue/Queue.hpp @@ -0,0 +1,61 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_QUEUE_QUEUE_HPP +#define NUCLEAR_THREADING_SCHEDULER_QUEUE_QUEUE_HPP + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + /** + * Abstract interface used by Pool so a single bucket array can hold either an + * MPMC TaskQueue (for multi-consumer pools) or an MPSCQueue (for single-consumer + * pools such as MainThread or Trace). + * + * The per-call indirection cost is negligible compared to the atomic ops inside + * the concrete enqueue/dequeue implementations, and the simpler MPSC queue is a + * meaningful win for pools that are by construction single-consumer. + */ + template + class Queue { + public: + Queue() = default; + Queue(const Queue&) = delete; + Queue(Queue&&) = delete; + Queue& operator=(const Queue&) = delete; + Queue& operator=(Queue&&) = delete; + virtual ~Queue() = default; + + /// Push an item into the queue. Must be safe to call from any thread. + virtual void enqueue(T&& item) = 0; + + /// Try to pop one item; returns true if `out` was populated. + virtual bool try_dequeue(T& out) = 0; + }; + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_QUEUE_QUEUE_HPP diff --git a/src/threading/scheduler/queue/Semaphore.hpp b/src/threading/scheduler/queue/Semaphore.hpp new file mode 100644 index 00000000..f6ecc7b9 --- /dev/null +++ b/src/threading/scheduler/queue/Semaphore.hpp @@ -0,0 +1,93 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_QUEUE_SEMAPHORE_HPP +#define NUCLEAR_THREADING_SCHEDULER_QUEUE_SEMAPHORE_HPP + +#include +#include +#include +#include + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + /** + * Counting semaphore with an atomic fast path and mutex/condition_variable slow path. + * + * A negative count indicates the number of threads blocked in wait(). + */ + class Semaphore { + public: + Semaphore() = default; + ~Semaphore() = default; + + Semaphore(const Semaphore&) = delete; + Semaphore& operator=(const Semaphore&) = delete; + Semaphore(Semaphore&&) = delete; + Semaphore& operator=(Semaphore&&) = delete; + + void signal(int n = 1) { + const int previous = count.fetch_add(n, std::memory_order_release); + if (previous < 0) { + const std::lock_guard lock(mutex); + const int waiters = std::min(n, -previous); + for (int i = 0; i < waiters; ++i) { + cv.notify_one(); + } + } + } + + void wait() { + if (count.fetch_sub(1, std::memory_order_acq_rel) > 0) { + return; + } + + std::unique_lock lock(mutex); + while (count.load(std::memory_order_acquire) < 0) { + cv.wait(lock); + } + } + + bool try_wait() { + int expected = count.load(std::memory_order_acquire); + while (expected > 0) { + if (count.compare_exchange_weak(expected, expected - 1, std::memory_order_acq_rel)) { + return true; + } + } + return false; + } + + private: + std::atomic count{0}; + std::mutex mutex; + std::condition_variable cv; + }; + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_QUEUE_SEMAPHORE_HPP diff --git a/src/threading/scheduler/queue/TaskQueue.hpp b/src/threading/scheduler/queue/TaskQueue.hpp new file mode 100644 index 00000000..2d5ffdda --- /dev/null +++ b/src/threading/scheduler/queue/TaskQueue.hpp @@ -0,0 +1,268 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#ifndef NUCLEAR_THREADING_SCHEDULER_QUEUE_TASK_QUEUE_HPP +#define NUCLEAR_THREADING_SCHEDULER_QUEUE_TASK_QUEUE_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Queue.hpp" + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + /** + * Lock-free multi-producer multi-consumer unbounded FIFO queue. + * + * Storage is organised in fixed-size blocks linked in a list. Fully drained blocks are + * retired to a graveyard and deleted when the queue is destroyed. Per-producer FIFO is + * preserved; cross-producer ordering is not guaranteed. + */ + template + class TaskQueue : public Queue { + static_assert(std::is_move_constructible::value, "TaskQueue requires move constructible T"); + + private: + static constexpr std::size_t BLOCK_SIZE = 64; + + struct Block; + + struct Slot { + std::atomic committed{false}; + /// Raw aligned storage for the T payload. Left value-initialised (zeroed) so the + /// constructor fully covers all members; placement-new overwrites it on enqueue. + alignas(T) std::array storage{}; + }; + + struct Block { + std::array slots{}; + std::atomic write{0}; + std::atomic read{0}; + std::atomic consumed{0}; + std::atomic next{nullptr}; + Block* graveyard_next{nullptr}; + }; + + static T* slot_ptr(Slot& slot) { + return reinterpret_cast(slot.storage.data()); + } + + static void destroy_slot(Slot& slot) { + slot_ptr(slot)->~T(); + slot.committed.store(false, std::memory_order_relaxed); + } + + static Block* allocate_block() { + return new Block(); + } + + // Retired blocks are kept alive on the graveyard so consumers that still hold + // a stale pointer cannot observe freed memory. + void retire_block(Block* block) { + Block* head_graveyard = graveyard.load(std::memory_order_acquire); + while (true) { + block->graveyard_next = head_graveyard; + if (graveyard.compare_exchange_weak(head_graveyard, + block, + std::memory_order_release, + std::memory_order_relaxed)) { + return; + } + } + } + + bool link_next_block(Block* block) { + Block* expected = nullptr; + if (block->next.compare_exchange_strong(expected, allocate_block(), std::memory_order_acq_rel)) { + return true; + } + return expected != nullptr; + } + + void advance_tail(Block* expected, Block* next) { + Block* tail_ptr = tail.load(std::memory_order_acquire); + while (tail_ptr == expected) { + if (tail.compare_exchange_weak(tail_ptr, next, std::memory_order_release, std::memory_order_relaxed)) { + return; + } + } + } + + void try_reclaim_block(Block* block) { + if (block->consumed.load(std::memory_order_acquire) != BLOCK_SIZE) { + return; + } + + Block* head_ptr = head.load(std::memory_order_acquire); + if (head_ptr != block) { + return; + } + + // Never strand head at nullptr; only advance if a successor block exists. + Block* next = block->next.load(std::memory_order_acquire); + if (next == nullptr) { + return; + } + if (head.compare_exchange_strong(head_ptr, next, std::memory_order_release, std::memory_order_relaxed)) { + retire_block(block); + } + } + + std::atomic head; + std::atomic tail; + std::atomic graveyard; + + public: + TaskQueue() { + auto* initial = new Block(); + head.store(initial, std::memory_order_relaxed); + tail.store(initial, std::memory_order_relaxed); + graveyard.store(nullptr, std::memory_order_relaxed); + } + + TaskQueue(const TaskQueue&) = delete; + TaskQueue& operator=(const TaskQueue&) = delete; + TaskQueue(TaskQueue&&) = delete; + TaskQueue& operator=(TaskQueue&&) = delete; + + ~TaskQueue() override { + Block* current = head.load(std::memory_order_relaxed); + while (current != nullptr) { + Block* next = current->next.load(std::memory_order_relaxed); + delete current; + current = next; + } + + Block* dead = graveyard.load(std::memory_order_relaxed); + while (dead != nullptr) { + Block* next = dead->graveyard_next; + delete dead; + dead = next; + } + } + + void enqueue(const T& item) { + T copy(item); + enqueue(std::move(copy)); + } + + void enqueue(T&& item) override { + while (true) { + Block* block = tail.load(std::memory_order_acquire); + const std::size_t index = block->write.fetch_add(1, std::memory_order_relaxed); + + if (index < BLOCK_SIZE) { + Slot& slot = block->slots[index]; + new (slot.storage.data()) T(std::move(item)); + slot.committed.store(true, std::memory_order_release); + return; + } + + if (!link_next_block(block)) { + // Another thread linked next; help advance tail. + } + + Block* next = block->next.load(std::memory_order_acquire); + advance_tail(block, next); + } + } + + bool try_dequeue(T& out) override { + while (true) { + Block* block = head.load(std::memory_order_acquire); + + const std::size_t published = + std::min(block->write.load(std::memory_order_acquire), + static_cast(BLOCK_SIZE)); + std::size_t read_index = block->read.load(std::memory_order_relaxed); + + if (read_index >= published) { + if (block->consumed.load(std::memory_order_acquire) < published) { + std::this_thread::yield(); + continue; + } + + Block* next = block->next.load(std::memory_order_acquire); + if (next == nullptr) { + // Producer may still be writing the first slot of an empty-looking block. + if (published == 0 && block->write.load(std::memory_order_acquire) > 0) { + std::this_thread::yield(); + continue; + } + return false; + } + + head.compare_exchange_strong(block, next, std::memory_order_release, std::memory_order_relaxed); + continue; + } + + if (!block->read.compare_exchange_weak(read_index, + read_index + 1, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + continue; + } + + Slot& slot = block->slots[read_index]; + while (!slot.committed.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + out = std::move(*slot_ptr(slot)); + destroy_slot(slot); + + if (block->consumed.fetch_add(1, std::memory_order_acq_rel) + 1 == BLOCK_SIZE) { + try_reclaim_block(block); + } + + return true; + } + } + + bool empty() const { + Block* block = head.load(std::memory_order_acquire); + while (block != nullptr) { + const std::size_t published = std::min(block->write.load(std::memory_order_acquire), + static_cast(BLOCK_SIZE)); + if (block->read.load(std::memory_order_relaxed) < published) { + return false; + } + block = block->next.load(std::memory_order_acquire); + } + return true; + } + }; + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear + +#endif // NUCLEAR_THREADING_SCHEDULER_QUEUE_TASK_QUEUE_HPP diff --git a/tests/tests/Benchmark.cpp b/tests/tests/Benchmark.cpp new file mode 100644 index 00000000..dd26ac7c --- /dev/null +++ b/tests/tests/Benchmark.cpp @@ -0,0 +1,180 @@ +/* + * MIT License + * + * Copyright (c) 2015 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nuclear" + +namespace { + + /// Total number of ping-pong hops a single chain performs before it terminates. + constexpr int CHAIN_LENGTH = 10000; + + /// Sync mode for the benchmark reactor. + enum class SyncMode : uint8_t { + NONE, ///< No Sync at all + SINGLE, ///< All reactions share a single Sync group + TWO_GROUPS ///< Reactions split between two competing Sync groups + }; + + template + class BenchmarkReactor : public NUClear::Reactor { + public: + struct SyncA {}; + struct SyncB {}; + + struct MessageA { + explicit MessageA(const int& count = 0) : count(count) {} + int count{}; + }; + struct MessageB { + explicit MessageB(const int& count = 0) : count(count) {} + int count{}; + }; + + BenchmarkReactor(std::unique_ptr environment, int fanout) + : NUClear::Reactor(std::move(environment)), fanout(fanout) { + + switch (mode) { + case SyncMode::NONE: { + on>().then([this](const MessageA& m) { on_a(m); }); + on>().then([this](const MessageB& m) { on_b(m); }); + } break; + case SyncMode::SINGLE: { + on, Sync>().then([this](const MessageA& m) { on_a(m); }); + on, Sync>().then([this](const MessageB& m) { on_b(m); }); + } break; + case SyncMode::TWO_GROUPS: { + // Each chain ping-pongs between two competing Sync groups + on, Sync>().then([this](const MessageA& m) { on_a(m); }); + on, Sync>().then([this](const MessageB& m) { on_b(m); }); + } break; + } + + on().then([this] { + for (int i = 0; i < this->fanout; ++i) { + emit(std::make_unique()); + } + }); + } + + std::atomic finished_count{0}; + int fanout{}; + + private: + void on_a(const MessageA& m) { + if (m.count < CHAIN_LENGTH) { + emit(std::make_unique(m.count + 1)); + } + else { + if (finished_count.fetch_add(1, std::memory_order_relaxed) + 1 == fanout) { + powerplant.shutdown(); + } + } + } + + void on_b(const MessageB& m) { + if (m.count < CHAIN_LENGTH) { + emit(std::make_unique(m.count + 1)); + } + } + }; + + template + std::int64_t run_benchmark(int pool_concurrency, int fanout) { + NUClear::Configuration config; + config.default_pool_concurrency = pool_concurrency; + + NUClear::PowerPlant plant(config); + plant.install>(fanout); + + const auto start = std::chrono::high_resolution_clock::now(); + plant.start(); + const auto end = std::chrono::high_resolution_clock::now(); + + return std::chrono::duration_cast(end - start).count(); + } + + std::string mode_name(SyncMode m) { + switch (m) { + case SyncMode::NONE: return "no-sync "; + case SyncMode::SINGLE: return "single-sync "; + case SyncMode::TWO_GROUPS: return "two-syncs "; + } + return "?"; + } + + void run_matrix(SyncMode mode) { + const int hw = int(std::thread::hardware_concurrency()); + const int hw_half = std::max(1, hw / 2); + + const std::array concurrencies{{1, hw_half, hw, hw * 2}}; + const std::array fanouts{{1, hw, hw * 4}}; + + std::ostringstream out; + out << "\n=== Benchmark: " << mode_name(mode) << " (chain=" << CHAIN_LENGTH << ") ===\n"; + out << std::setw(12) << "threads" << std::setw(12) << "fanout" << std::setw(12) << "µs" << "\n"; + out << " ----------------------------------\n"; + + std::int64_t total = 0; + for (const int concurrency : concurrencies) { + for (const int fanout : fanouts) { + std::int64_t us = 0; + switch (mode) { + case SyncMode::NONE: us = run_benchmark(concurrency, fanout); break; + case SyncMode::SINGLE: us = run_benchmark(concurrency, fanout); break; + case SyncMode::TWO_GROUPS: us = run_benchmark(concurrency, fanout); break; + } + out << std::setw(12) << concurrency << std::setw(12) << fanout << std::setw(12) << us << "\n"; + total += us; + } + } + out << " total: " << total << "µs\n"; + + std::cout << out.str() << std::endl; + } + +} // namespace + +TEST_CASE("Benchmark emit ping-pong without sync", "[benchmark]") { + run_matrix(SyncMode::NONE); +} + +TEST_CASE("Benchmark emit ping-pong with a single sync", "[benchmark]") { + run_matrix(SyncMode::SINGLE); +} + +TEST_CASE("Benchmark emit ping-pong with two competing syncs", "[benchmark]") { + run_matrix(SyncMode::TWO_GROUPS); +} diff --git a/tests/tests/threading/Group.cpp b/tests/tests/threading/Group.cpp index 69bed9aa..2c088bb4 100644 --- a/tests/tests/threading/Group.cpp +++ b/tests/tests/threading/Group.cpp @@ -28,6 +28,9 @@ #include #include "id.hpp" +// Group's WaitEntry holds a std::unique_ptr, so a complete type is needed at the +// point where TaskQueue is instantiated (which happens via Group's constructor). +#include "threading/ReactionTask.hpp" // NOLINT(misc-include-cleaner) #include "threading/scheduler/Lock.hpp" #include "util/GroupDescriptor.hpp" diff --git a/tests/tests/threading/MPSCQueue.cpp b/tests/tests/threading/MPSCQueue.cpp new file mode 100644 index 00000000..e00fdf0f --- /dev/null +++ b/tests/tests/threading/MPSCQueue.cpp @@ -0,0 +1,158 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "threading/scheduler/queue/MPSCQueue.hpp" + +#include +#include +#include +#include +#include +#include + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + SCENARIO("An MPSCQueue used by a single producer and single consumer preserves FIFO order", + "[threading][queue][MPSCQueue]") { + GIVEN("An empty MPSCQueue") { + MPSCQueue queue; + + WHEN("Two values are enqueued in order") { + queue.enqueue(1); + queue.enqueue(2); + + THEN("They are dequeued in the same order and the queue is then empty") { + int value = 0; + CHECK(queue.try_dequeue(value)); + CHECK(value == 1); + CHECK(queue.try_dequeue(value)); + CHECK(value == 2); + CHECK_FALSE(queue.try_dequeue(value)); + } + } + } + } + + SCENARIO("An MPSCQueue can store move-only payloads", "[threading][queue][MPSCQueue]") { + GIVEN("An MPSCQueue of std::unique_ptr") { + MPSCQueue> queue; + + WHEN("A unique_ptr holding 42 is enqueued") { + queue.enqueue(std::make_unique(42)); + + THEN("The same value can be dequeued without copying") { + std::unique_ptr value; + CHECK(queue.try_dequeue(value)); + REQUIRE(value != nullptr); + CHECK(*value == 42); + } + } + } + } + + SCENARIO("An MPSCQueue handles many enqueues from one thread followed by many dequeues", + "[threading][queue][MPSCQueue]") { + GIVEN("An MPSCQueue with 5000 sequentially enqueued integers") { + MPSCQueue queue; + for (int i = 0; i < 5000; ++i) { + queue.enqueue(i); + } + + WHEN("They are all dequeued in turn") { + bool sequence_holds = true; + for (int i = 0; i < 5000; ++i) { + int value = -1; + if (!queue.try_dequeue(value) || value != i) { + sequence_holds = false; + break; + } + } + + THEN("Each dequeue returns the next integer in order and the queue is empty") { + CHECK(sequence_holds); + int discard = 0; + CHECK_FALSE(queue.try_dequeue(discard)); + } + } + } + } + + // Stress test for the MPSC contract: many producers race to enqueue while a single consumer + // drains. We tag each item with (producer_id, sequence_no) so we can assert per-producer FIFO + // is preserved even though cross-producer ordering is intentionally undefined. + SCENARIO("An MPSCQueue used by many producers and one consumer preserves per-producer FIFO", + "[threading][queue][MPSCQueue]") { + GIVEN("Eight producer threads each enqueueing 2000 (producer_id, sequence) pairs") { + constexpr int items_per_producer = 2000; + constexpr int producers = 8; + + MPSCQueue> queue; + std::atomic produced{0}; + + WHEN("A single consumer drains every item that the producers emit") { + std::vector producer_threads; + producer_threads.reserve(producers); + for (int p = 0; p < producers; ++p) { + producer_threads.emplace_back([&, p]() { + for (int i = 0; i < items_per_producer; ++i) { + queue.enqueue({p, i}); + produced.fetch_add(1, std::memory_order_relaxed); + } + }); + } + + std::vector per_producer_last(producers, -1); + bool per_producer_fifo_ok = true; + int consumed = 0; + while (consumed < producers * items_per_producer) { + std::pair value{}; + if (queue.try_dequeue(value)) { + if (value.second != per_producer_last[value.first] + 1) { + per_producer_fifo_ok = false; + } + per_producer_last[value.first] = value.second; + ++consumed; + } + else { + std::this_thread::yield(); + } + } + + for (auto& thread : producer_threads) { + thread.join(); + } + + THEN("Every item appears exactly once and per-producer order is preserved") { + CHECK(produced.load() == producers * items_per_producer); + CHECK(consumed == producers * items_per_producer); + CHECK(per_producer_fifo_ok); + } + } + } + } + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear diff --git a/tests/tests/threading/Semaphore.cpp b/tests/tests/threading/Semaphore.cpp new file mode 100644 index 00000000..b12daf58 --- /dev/null +++ b/tests/tests/threading/Semaphore.cpp @@ -0,0 +1,124 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "threading/scheduler/queue/Semaphore.hpp" + +#include +#include +#include +#include + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + SCENARIO("A signal on a semaphore unblocks a thread that is waiting on it", + "[threading][queue][Semaphore]") { + GIVEN("A fresh semaphore with a thread blocked on wait()") { + Semaphore sem; + std::atomic done{false}; + std::thread waiter([&]() { + sem.wait(); + done.store(true, std::memory_order_release); + }); + // Give the waiter a moment to actually park on the semaphore before we observe it. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + WHEN("No signal has been sent yet") { + THEN("The waiting thread is still blocked") { + CHECK_FALSE(done.load(std::memory_order_acquire)); + } + } + + WHEN("A signal is sent") { + sem.signal(); + waiter.join(); + + THEN("The waiting thread runs to completion") { + CHECK(done.load(std::memory_order_acquire)); + } + } + + // Whichever WHEN branch ran, make sure the waiter thread is released before this + // scope ends so we never leak a joinable std::thread into destruction. + if (waiter.joinable()) { + sem.signal(); + waiter.join(); + } + } + } + + SCENARIO("try_wait only succeeds when the semaphore has been signalled", + "[threading][queue][Semaphore]") { + GIVEN("A fresh semaphore") { + Semaphore sem; + + WHEN("try_wait is called before any signal") { + THEN("It returns false") { + CHECK_FALSE(sem.try_wait()); + } + } + + WHEN("A signal is sent and try_wait is called twice") { + sem.signal(); + const bool first = sem.try_wait(); + const bool second = sem.try_wait(); + + THEN("The first try_wait consumes the signal and the second returns false") { + CHECK(first); + CHECK_FALSE(second); + } + } + } + } + + SCENARIO("Signals and waits across two threads are conserved one-for-one", + "[threading][queue][Semaphore]") { + GIVEN("A semaphore with a consumer thread issuing many waits") { + constexpr int iterations = 1000; + Semaphore sem; + std::atomic completed{0}; + + std::thread consumer([&]() { + for (int i = 0; i < iterations; ++i) { + sem.wait(); + completed.fetch_add(1, std::memory_order_relaxed); + } + }); + + WHEN("The same number of signals are emitted from the producer") { + for (int i = 0; i < iterations; ++i) { + sem.signal(); + } + consumer.join(); + + THEN("Every signal is matched by exactly one wait completion") { + CHECK(completed.load() == iterations); + } + } + } + } + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear diff --git a/tests/tests/threading/TaskQueue.cpp b/tests/tests/threading/TaskQueue.cpp new file mode 100644 index 00000000..40164cd3 --- /dev/null +++ b/tests/tests/threading/TaskQueue.cpp @@ -0,0 +1,155 @@ +/* + * MIT License + * + * Copyright (c) 2024 NUClear Contributors + * + * This file is part of the NUClear codebase. + * See https://github.com/Fastcode/NUClear for further info. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +#include "threading/scheduler/queue/TaskQueue.hpp" + +#include +#include +#include +#include +#include +#include + +namespace NUClear { +namespace threading { + namespace scheduler { + namespace queue { + + SCENARIO("A TaskQueue used by a single producer and a single consumer preserves FIFO order", + "[threading][queue][TaskQueue]") { + GIVEN("An empty TaskQueue") { + TaskQueue queue; + + WHEN("Two values are enqueued in order") { + queue.enqueue(1); + queue.enqueue(2); + + THEN("They are dequeued in the same order and the queue is then empty") { + int value = 0; + CHECK(queue.try_dequeue(value)); + CHECK(value == 1); + CHECK(queue.try_dequeue(value)); + CHECK(value == 2); + CHECK_FALSE(queue.try_dequeue(value)); + CHECK(queue.empty()); + } + } + } + } + + SCENARIO("A TaskQueue can store move-only payloads", "[threading][queue][TaskQueue]") { + GIVEN("A TaskQueue of std::unique_ptr") { + TaskQueue> queue; + + WHEN("A unique_ptr holding 42 is enqueued") { + queue.enqueue(std::make_unique(42)); + + THEN("The same value can be dequeued without copying") { + std::unique_ptr value; + CHECK(queue.try_dequeue(value)); + REQUIRE(value != nullptr); + CHECK(*value == 42); + } + } + } + } + + SCENARIO("A TaskQueue handles many enqueues from one thread followed by many dequeues", + "[threading][queue][TaskQueue]") { + GIVEN("A TaskQueue with 5000 sequentially enqueued integers") { + TaskQueue queue; + for (int i = 0; i < 5000; ++i) { + queue.enqueue(i); + } + + WHEN("They are all dequeued in turn") { + bool sequence_holds = true; + for (int i = 0; i < 5000; ++i) { + int value = -1; + if (!queue.try_dequeue(value) || value != i) { + sequence_holds = false; + break; + } + } + + THEN("Each dequeue returns the next integer in order and the queue is empty") { + CHECK(sequence_holds); + CHECK(queue.empty()); + } + } + } + } + + // Stress test: with multiple producers writing concurrently we cannot assert + // total ordering across producers, but every item must come out exactly once. + SCENARIO("A TaskQueue used by many producers and many consumers conserves every item", + "[threading][queue][TaskQueue]") { + GIVEN("Four producer threads each enqueueing 500 items and four consumer threads draining") { + constexpr int items_per_producer = 500; + constexpr int producers = 4; + constexpr int consumers = 4; + + TaskQueue queue; + std::atomic produced{0}; + std::atomic consumed{0}; + + WHEN("All producers and consumers run to completion") { + std::vector threads; + threads.reserve(static_cast(producers) + static_cast(consumers)); + for (int p = 0; p < producers; ++p) { + threads.emplace_back([&, p]() { + for (int i = 0; i < items_per_producer; ++i) { + queue.enqueue(p * items_per_producer + i); + produced.fetch_add(1, std::memory_order_relaxed); + } + }); + } + for (int c = 0; c < consumers; ++c) { + threads.emplace_back([&]() { + int value = 0; + while (consumed.load(std::memory_order_acquire) < producers * items_per_producer) { + if (queue.try_dequeue(value)) { + consumed.fetch_add(1, std::memory_order_relaxed); + } + else { + std::this_thread::yield(); + } + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + THEN("Total produced equals total consumed and the queue ends empty") { + CHECK(produced.load() == producers * items_per_producer); + CHECK(consumed.load() == producers * items_per_producer); + CHECK(queue.empty()); + } + } + } + } + + } // namespace queue + } // namespace scheduler +} // namespace threading +} // namespace NUClear diff --git a/tests/tests/util/serialise/xxhash.cpp b/tests/tests/util/serialise/xxhash.cpp index 83a98f80..b03e4991 100644 --- a/tests/tests/util/serialise/xxhash.cpp +++ b/tests/tests/util/serialise/xxhash.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include