Skip to content

core: move hand-rolled thread-safety to the asio executor model#2903

Open
julianoes wants to merge 9 commits into
mainfrom
message-dispatch-strand
Open

core: move hand-rolled thread-safety to the asio executor model#2903
julianoes wants to merge 9 commits into
mainfrom
message-dispatch-strand

Conversation

@julianoes

@julianoes julianoes commented Jun 15, 2026

Copy link
Copy Markdown
Collaborator

Replace core's hand-rolled thread-safety (a mutex plus deferred side-tables and try_lock-or-defer) with the executor model already used elsewhere: the shared state is only ever touched on the io_context thread. Reads run there directly (inline when already on the io thread); mutations asio::post() onto the io_context, so they're serialized and safe to call from inside a callback.

Converted:

  • MavlinkParameterSubscription — subscription list confined to the io_context.
  • MavlinkMessageHandler — handler table confined to the io_context.
  • CallbackList — subscription list confined to the io_context; empty() is an atomic. The dedicated mutexes that only guarded a subscription list are removed.

Two things the strand model has to get right:

  • Lifetime. Owners post onto the io_context during their own destruction (e.g. ~ServerComponentImpl), so the io_context must outlive them. _io_context/_io_work_guard are now the first members of MavsdkImpl, so it's destroyed last; otherwise those posts hit a freed io_context (use-after-free + a leaked op). A CallbackList also drains the io_context in its destructor, since plugins can be destroyed mid-run while posts are still queued.

  • No blocking onto the io thread. subscribe/unsubscribe/clear and CallbackList::queue() post without waiting, so they can't deadlock when called from inside a callback or while holding a lock the io thread also needs (queue() is used that way from subscribe_on_new_system and subscribe_component_discovered).

Replace MavlinkParameterSubscription's hand-rolled thread-safety (a mutex
plus three deferred side-tables, try_lock-or-defer, and fixed-order
application of subscribes-before-unsubscribes) with the executor model the
surrounding code already uses: the subscription list is only ever touched
on the io_context thread.

- find_and_call_subscriptions_value_changed() iterates the list directly
  with no lock. Its callers run it on the io thread: the received
  PARAM_VALUE/PARAM_EXT_VALUE handlers (client) and process_param_set
  (server). The one user-thread caller, provide_server_param(), now
  asio::post()s the notification onto the io_context.
- subscribe/unsubscribe asio::post() their mutation onto the io_context, so
  they are safe from any thread and from inside a callback (post runs after
  the current dispatch), and ordering is preserved by post FIFO.
- unsubscribe_all_params_changed_blocking() uses asio::dispatch() and waits
  on a future (with an io_context.stopped() fast-path): it runs inline when
  already on the io thread (no self-deadlock) and posts-and-waits otherwise.
  This lets ParamServerImpl::deinit() drop its 10ms sleep hack.
- For consistency, MavlinkParameterClient::cancel_all_param() also switches
  its post()+wait to dispatch()+wait, so it can't self-deadlock if ever
  called from the io_context thread.

This removes the UB (try_lock on a mutex already held by the same thread in
the in-callback path) and the subscribe/unsubscribe reorder hazard, and is
a template for converting MavlinkMessageHandler the same way once the
in-flight message-handler PR lands.

Full unit (291) and system (85) suites pass.
Replace MavlinkMessageHandler's hand-rolled thread-safety (a mutex plus two
deferred side-tables and try_lock-or-defer, applying registers before
unregisters) with the executor model the rest of core now uses: the handler
table is only ever touched on the io_context thread.

- process_message() already runs on the io thread (it is driven by the
  connection receive path, which posts onto the io_context), so it iterates
  the table directly with no lock.
- register/unregister asio::post() their mutation onto the io_context, so they
  are safe from any thread and from inside a callback (the mutation runs after
  the current dispatch), with ordering preserved by post FIFO.

Teardown is split by where the owner is destroyed, which the caller always
knows by construction, so no run-time "am I on the io thread" detection is
needed (asio's running_in_this_thread() is unreliable during destructor
cleanup anyway):

- unregister_all_on_io_thread(): direct synchronous erase, for owners that
  live and die ON the io thread (mission-transfer WorkItems). They are only
  ever destroyed from do_work(), never mid process_message() iteration, so the
  direct erase is race-free and avoids a fire-after-free without a round-trip.
- unregister_all_blocking(): posts the removal and waits, for owners destroyed
  OFF the io thread (plugin destructors on the user thread). Plugins are
  documented to be destroyed before Mavsdk, so the io thread is alive and the
  wait completes.

Shutdown is covered by the existing ~MavsdkImpl top-level stop()+join(): the
io thread is gone before systems/core objects destruct, so their
unregister_all() posts are harmless no-ops and no callback can fire on freed
memory.

MavlinkMessageHandler now takes the io_context by reference; wire it through
MavsdkImpl and SystemImpl. The mission-transfer unit tests stay single
threaded (manual poll()): registrations are posted, so the test poll()s to
apply them before delivering a message, and WorkItems unregister directly on
the polling thread -- no background io thread needed.

Full unit (291) and system (85) suites pass.
Both the message handler and the parameter subscriptions now asio::post()
their table mutations onto the io_context, including unregister/unsubscribe
calls that run during teardown (e.g. MavlinkFtpServer's destructor posts an
unregister_all() via ~ServerComponentImpl).

_io_context was declared near the end of MavsdkImpl, so as a member it was
destroyed before _server_components/_systems/_connections. Those owners then
post onto an already-freed io_context, which ASan/TSan/UBSan flagged as a
heap-use-after-free in asio::scheduler::post_immediate_completion, and which
also leaked the posted operation (caught by the fuzzer's LeakSanitizer).

Declare _io_context and _io_work_guard as the first data members so the
io_context is destroyed last. Teardown posts then land on a live (stopped)
io_context and are freed when it is destroyed. Verified clean under ASan +
LeakSanitizer (291 unit tests).
@julianoes julianoes changed the title core: move hand-rolled callback queues to asio executor model core: move hand-rolled thread-safety to the asio executor model Jun 23, 2026
Replace CallbackList's hand-rolled thread-safety (a mutex plus try_lock-or-defer
with _subscribe_later/_remove_later/_remove_all_later side-tables drained by
check_removals()/process_subscriptions()) with the executor model used elsewhere:
the list is only ever touched on the io_context thread.

- subscribe()/subscribe_conditional() post their insertion onto the io_context;
  the handle is created and returned synchronously (the handle factory is atomic).
- exec()/queue() iterate the list inline on the io thread (the hot path, driven by
  received-message handlers), or post-and-wait when called off it.
- unsubscribe()/clear() post their mutation rather than running it inline, so they
  never mutate the list mid exec() iteration (this keeps unsubscribe-from-callback
  safe); off the io thread they wait until applied, so no callback fires afterwards.
- empty() reads an atomic size, valid from any thread.

CallbackList now takes an io_context by reference. It is wired through every owner:
the core (MavsdkImpl/SystemImpl), all plugins, the nested per-plugin structs
(mission/mission_raw/shell now name those structs and pass it through), and the
mavlink_passthrough per-message map (try_emplace). ServerComponentImpl gains a public
io_context() accessor to match SystemImpl. callback_list.hpp only forward-declares
asio::io_context so the public-facing header does not pull asio in.

The unit test drives the list from a background io_context thread via a fixture.
CallbackList is now internally confined to the io_context, so the dedicated
mutexes that existed only to serialize access to a subscription list are no
longer needed. Remove them:

- telemetry: _subscription_mutex (guarded only the *_subscriptions lists; the
  cached telemetry values have their own per-field mutexes).
- winch / transponder: _subscription_mutex.
- shell: the Receive struct's mutex.
- mavlink_component_metadata: _notification_callbacks_mutex.

Multi-purpose mutexes that also guard other state (e.g. mission's
_mission_data.mutex, action_server's _callback_mutex, the various plugin _mutex)
are left untouched -- they still protect non-CallbackList members.
Two problems surfaced in the system tests with the first cut of the
io_context-confined CallbackList:

- unsubscribe()/clear() did a blocking round-trip onto the io thread when called
  off it. Callbacks commonly unsubscribe themselves from inside the callback
  (which runs on the user-callback thread) while the owning plugin holds a lock
  that the io thread also needs -- the round-trip then deadlocks. Camera's
  capture-info test hit exactly this. Make these mutations post without waiting:
  posting still keeps them off the middle of an exec() iteration, and not
  waiting removes the deadlock. This matches the old try-lock-or-defer, which
  never blocked either (and queue() already means an in-flight callback can fire
  after unsubscribe() returns regardless).

- because the posted mutations capture `this`, they must not outlive the list.
  Plugins (unlike the param/handler owners, which die only at full teardown) can
  be destroyed by the user mid-run, so the impl destructor now waits once for the
  io_context to flush anything still queued before the list is torn down.

The unit test gained a flush() helper: a deferred in-callback unsubscribe is only
observable via empty() after an io_context round-trip.

Verified with ASan: unit tests (x10), 85 system tests, and Camera.TakePhoto (x15).
The telemetry gRPC service test uses local CallbackLists as fakes to capture and
replay the service's subscription callbacks. CallbackList now requires an
io_context, so the fixture runs one on a background thread and the fakes are
constructed with it. (mavsdk_server isn't built under the sanitizer CI jobs,
which is why this only showed up in the RelWithDebInfo/server builds.)
…test

CallbackList now requires an io_context, but mavsdk_server is built with
-fno-exceptions and does not link asio, so pulling <asio/io_context.hpp> into a
server test does not compile. The telemetry service test was the only place in
mavsdk_server using CallbackList (as a fake to capture and replay a service's
subscription callback); replace it with a tiny local FakeCallbackList that stores
a std::function and returns a fake handle, matching how the other service tests
already store callbacks. No asio, no io_context needed.
queue() went through the blocking read-on-io path: off the io thread it posted
the iteration and waited for it. But queue() is called while holding the
recursive _mutex (e.g. subscribe_on_new_system replaying existing systems from
first_autopilot, and subscribe_component_discovered under _components_mutex).
Meanwhile the io thread takes that same _mutex in process_libmav_message, so the
wait deadlocked the io thread against the caller. This was timing-dependent and
showed up as a hang in the Intercept system test under UBSan/macOS.

queue() only hands the callbacks to queue_func (which enqueues them to run
later), so it never needs to be synchronous. Post it without waiting when off the
io thread, like the mutations. exec() keeps the blocking path -- it invokes the
callbacks directly so the caller's arguments must outlive the call -- and its
only off-thread caller (raw-bytes notify) borrows a stack buffer.

Verified with ASan: Intercept x30, two full system-test runs (85 tests), and the
CallbackList unit tests.

@julianoes julianoes left a comment

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed what Claude did. I think it might work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant