Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# Build & CMake files
build/
build-*/
CMakeCache.txt
CMakeFiles
Makefile
Expand Down
65 changes: 62 additions & 3 deletions src/dsl/word/Watchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#ifndef NUCLEAR_DSL_WORD_WATCHDOG_HPP
#define NUCLEAR_DSL_WORD_WATCHDOG_HPP

#include <mutex>
#include <stdexcept>

#include "../../threading/Reaction.hpp"
Expand Down Expand Up @@ -52,12 +53,25 @@ namespace dsl {
using MapType = std::remove_cv_t<RuntimeType>;
using WatchdogStore = util::TypeMap<WatchdogGroup, MapType, std::map<MapType, NUClear::clock::time_point>>;

/**
* Mutex protecting structural and value updates to the underlying map for this
* (WatchdogGroup, RuntimeType) pair. Watchdog timers are read by the chrono controller
* thread (via @ref get) while being written by user threads that emit a service event
* (via @ref service), and the underlying std::map is also mutated by init/unbind, so a
* single shared mutex serialises all of those operations.
*/
static std::mutex& mutex() {
static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
return m;
}

/**
* Ensures the data store is initialised correctly.
*
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
*/
static void init(const RuntimeType& data) {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr) {
WatchdogStore::set(std::make_shared<std::map<MapType, NUClear::clock::time_point>>());
}
Expand All @@ -67,11 +81,15 @@ namespace dsl {
}

/**
* Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog
* Gets the current service time for the WatchdogGroup/RuntimeType/data watchdog.
*
* Returned by value so the caller never holds a reference into the (mutex-protected)
* map. The time_point is small and trivially copyable so the copy is essentially free.
*
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
*/
static const NUClear::clock::time_point& get(const RuntimeType& data) {
static NUClear::clock::time_point get(const RuntimeType& data) {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", "
+ util::demangle(typeid(MapType).name())
Expand All @@ -80,12 +98,29 @@ namespace dsl {
return WatchdogStore::get()->at(data);
}

/**
* Atomically updates the service time for the WatchdogGroup/RuntimeType/data watchdog.
*
* Called by @ref emit::WatchdogServicer::service to keep the write under the same
* mutex that @ref get uses for reads.
*/
static void service(const RuntimeType& data, const NUClear::clock::time_point& when) {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", "
+ util::demangle(typeid(MapType).name())
+ "> has not been created yet or no watchdog has been set up");
}
WatchdogStore::get()->at(data) = when;
}

/**
* Cleans up any allocated storage for the WatchdogGroup/RuntimeType/data watchdog
*
* @param data The runtime argument for the current watchdog in the WatchdogGroup/RuntimeType group
*/
static void unbind(const RuntimeType& data) {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() != nullptr) {
WatchdogStore::get()->erase(data);
}
Expand All @@ -105,30 +140,54 @@ namespace dsl {
struct WatchdogDataStore<WatchdogGroup, void> {
using WatchdogStore = util::TypeMap<WatchdogGroup, void, NUClear::clock::time_point>;

/// See the documentation on the runtime-arg specialisation.
static std::mutex& mutex() {
static std::mutex m; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
return m;
}

/**
* Ensures the data store is initialised correctly.
*/
static void init() {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr) {
WatchdogStore::set(std::make_shared<NUClear::clock::time_point>(NUClear::clock::now()));
}
}

/**
* Gets the current service time for the WatchdogGroup watchdog.
*
* Returned by value so the caller never reads from the time_point while it is being
* mutated by @ref service on another thread.
*/
static const NUClear::clock::time_point& get() {
static NUClear::clock::time_point get() {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name())
+ "> is trying to field a service call for an unknown data type");
}
return *WatchdogStore::get();
}

/**
* Atomically updates the service time for the WatchdogGroup watchdog.
*/
static void service(const NUClear::clock::time_point& when) {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() == nullptr) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name())
+ "> has not been created yet or no watchdog has been set up");
}
*WatchdogStore::get() = when;
}

/**
* Cleans up any allocated storage for the WatchdogGroup watchdog.
*/
static void unbind() {
const std::lock_guard<std::mutex> lock(mutex());
if (WatchdogStore::get() != nullptr) {
WatchdogStore::get().reset();
}
Expand Down
33 changes: 10 additions & 23 deletions src/dsl/word/emit/Watchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
#ifndef NUCLEAR_DSL_WORD_EMIT_WATCHDOG_HPP
#define NUCLEAR_DSL_WORD_EMIT_WATCHDOG_HPP

#include <stdexcept>

#include "../../../PowerPlant.hpp"
#include "../../../util/TypeMap.hpp"
#include "../../../util/demangle.hpp"
#include "../Watchdog.hpp"

namespace NUClear {
namespace dsl {
Expand All @@ -47,8 +44,6 @@ namespace dsl {
template <typename WatchdogGroup, typename RuntimeType = void>
struct WatchdogServicer {
using MapType = std::remove_cv_t<RuntimeType>;
using WatchdogStore =
util::TypeMap<WatchdogGroup, MapType, std::map<MapType, NUClear::clock::time_point>>;

/**
* Construct a new Watchdog Servicer object
Expand All @@ -63,18 +58,14 @@ namespace dsl {
explicit WatchdogServicer(const RuntimeType& data) : data(data) {}

/**
* Services the watchdog
* Services the watchdog.
*
* The watchdog timer that is specified by the WatchdogGroup/RuntimeType/data combination will have its
* service time updated to whatever is stored in when.
* Delegates to @ref word::WatchdogDataStore::service so the write happens under the
* same mutex that guards reads in the chrono controller; otherwise the time_point
* would be torn-read / torn-written across threads.
*/
void service() {
if (WatchdogStore::get() == nullptr || WatchdogStore::get()->count(data) == 0) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name()) + ", "
+ util::demangle(typeid(RuntimeType).name())
+ "> has not been created yet or no watchdog has been set up");
}
WatchdogStore::get()->at(data) = when;
word::WatchdogDataStore<WatchdogGroup, RuntimeType>::service(data, when);
}

private:
Expand All @@ -94,19 +85,15 @@ namespace dsl {
*/
template <typename WatchdogGroup>
struct WatchdogServicer<WatchdogGroup, void> {
using WatchdogStore = util::TypeMap<WatchdogGroup, void, NUClear::clock::time_point>;

/**
* Services the watchdog
* Services the watchdog.
*
* The watchdog timer for WatchdogGroup will have its service time updated to whatever is stored in when
* Delegates to @ref word::WatchdogDataStore::service so the write happens under the
* same mutex that guards reads in the chrono controller.
*/
void service() {
if (WatchdogStore::get() == nullptr) {
throw std::domain_error("Store for <" + util::demangle(typeid(WatchdogGroup).name())
+ "> has not been created yet or no watchdog has been set up");
}
WatchdogStore::set(std::make_shared<NUClear::clock::time_point>(when));
word::WatchdogDataStore<WatchdogGroup, void>::service(when);
}

private:
Expand Down
16 changes: 14 additions & 2 deletions src/extension/IOController_Posix.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,20 @@ namespace extension {
tasks.erase(task);
}
else {
// Make sure poll isn't currently waiting for an event to happen
bump();
// We are about to mutate `watches[].events`, which the poll thread reads
// from inside ::poll(). Write to the notify pipe to kick poll out, then
// hold notifier.mutex for the duration of the mutation so the poll thread
// cannot re-enter ::poll() against a half-updated entry. This is the same
// wake-then-lock pattern bump() uses, but we keep the lock held until the
// watches update (and the follow-up fire_event, which can also touch
// watches[].events) is finished.
uint8_t val = 1;
if (::write(notifier.send, &val, sizeof(val)) < 0) {
throw std::system_error(network_errno,
std::system_category(),
"There was an error while writing to the notification pipe");
}
const std::lock_guard<std::mutex> notifier_lock(notifier.mutex);

// Unmask the events that were just processed
auto it = std::lower_bound(watches.begin(),
Expand Down
15 changes: 13 additions & 2 deletions src/threading/Reaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,19 @@ namespace threading {
/// The callback generator function (creates databound callbacks)
TaskGenerator generator;

/// Cached data for this reaction added by the scheduler
std::shared_ptr<void> scheduler_data;
/// Cached scheduler-private pointer for this reaction.
///
/// The scheduler uses this as a fast-path cache for the resolved pool that this reaction's
/// tasks should run on. It is a raw, non-owning `void*` rather than `std::shared_ptr<void>`
/// to avoid the per-submit cost of `std::atomic_load`/`atomic_store` on a `shared_ptr`,
/// which on libstdc++ falls back to a small global pool of mutexes (selected by pointer
/// hash) and can become a contention point on hot submission paths.
///
/// Ownership of whatever this points at lives entirely with the scheduler; reactions
/// outlive scheduler-side resources because PowerPlant tears reactors down before the
/// scheduler. The cache is set-once: the first submit resolves the pool and CASes it in,
/// subsequent submits just load it.
std::atomic<void*> scheduler_data{nullptr};
friend class scheduler::Scheduler; /// Let the scheduler mess with reaction objects
};

Expand Down
Loading
Loading