From 13373e31144f4e59023b3ad9155f69f26bbb765b Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Thu, 14 May 2026 13:00:43 +0200 Subject: [PATCH 1/5] feat(gateway,#380): expose entity context on /faults/stream SSE events Each `GET /api/v1/faults/stream` event payload now carries an optional `x-medkit` SOVD payload-extension object with `entity_type` and `entity_id` fields when the gateway can resolve the fault's first reporting source back to a SOVD entity. Consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly instead of enumerating apps + components and HEAD-probing each one. Nested under `x-medkit` per the SOVD payload-extension convention (matches `x-medkit.aggregation_level`, `x-medkit.phase`, etc.). Flat `x-medkit-*` names are reserved for endpoint paths (`/x-medkit-graph`) and error codes, not payload fields. Resolution chain (mirrors gateway_node's node_resolver lambda for the manifest path): - Manifest / hybrid: cache's node-to-app linking index, both with and without the leading slash on the FQN. - Runtime fallback: FQN's last segment, validated against `cache.has_app()` so we never point consumers at a 404. - Runtime collision fallback: `_` form (slashes in the namespace replaced with `_`), matching the disambiguation rule in ros2_runtime_introspection.cpp for nodes that share a name across namespaces. - No match: `x-medkit` object omitted entirely; an `RCLCPP_DEBUG` trace records the unresolved source so operators can diagnose consumers stuck on the discovery fallback path. Backward-compatible for existing SOVD consumers. Entity resolution is snapshotted in `on_fault_event` (before the queue lock is acquired) and stored as `std::optional` on the buffered event. A discovery refresh between enqueue and stream-out cannot retroactively flip the entity reported to consumers, and the format path stays lock-free with respect to the entity cache. --- docs/api/rest.rst | 7 +- src/ros2_medkit_gateway/CHANGELOG.rst | 1 + src/ros2_medkit_gateway/README.md | 7 +- .../http/handlers/sse_fault_handler.hpp | 32 ++- .../src/http/handlers/sse_fault_handler.cpp | 100 +++++++-- .../test/test_sse_fault_handler.cpp | 197 ++++++++++++++++++ 6 files changed, 324 insertions(+), 20 deletions(-) diff --git a/docs/api/rest.rst b/docs/api/rest.rst index ee21d7e4..cab8e8d1 100644 --- a/docs/api/rest.rst +++ b/docs/api/rest.rst @@ -2372,7 +2372,12 @@ Other extensions beyond SOVD: - Vendor extension fields using ``x-medkit`` prefix (per SOVD extension mechanism) - ``DELETE /faults`` - Clear all faults globally -- ``GET /faults/stream`` - SSE real-time fault notifications +- ``GET /faults/stream`` - SSE real-time fault notifications. Each event payload carries an + optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` + fields when the gateway can resolve the fault's first reporting source back to an entity, + so consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly + without enumerating entities. Resolution is snapshotted at event arrival; the entire + ``x-medkit`` object is omitted when no entity can be resolved. - ``/health`` - Health check with discovery pipeline diagnostics - ``/version-info`` - Gateway version information - ``/docs`` - OpenAPI capability description diff --git a/src/ros2_medkit_gateway/CHANGELOG.rst b/src/ros2_medkit_gateway/CHANGELOG.rst index 8744997e..63861402 100644 --- a/src/ros2_medkit_gateway/CHANGELOG.rst +++ b/src/ros2_medkit_gateway/CHANGELOG.rst @@ -11,6 +11,7 @@ Unreleased **Features:** +* ``GET /api/v1/faults/stream`` event payloads now carry an optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` fields. When the gateway can resolve the fault's first reporting source back to a SOVD entity (via the manifest-mode linking index, or a runtime-mode last-segment match against an existing App), consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly instead of HEAD-probing every entity. Resolution is snapshotted at event arrival, so a discovery refresh between enqueue and stream-out cannot retroactively change the entity reported to consumers. The ``x-medkit`` object is omitted entirely when no entity can be resolved, so existing SSE consumers ignore the addition (`#380 `_) * Plugin API version bumped to v7. Adds ``PluginContext::notify_entities_changed(EntityChangeScope)`` lifecycle hook for plugins that mutate the entity surface at runtime; default no-op keeps v6 source code compiling unchanged against v7 headers. Binary compatibility is not provided: the plugin loader uses a strict equality check on ``plugin_api_version()``, so out-of-tree plugins must be recompiled (`#376 `_) * New ``discovery.manifest.fragments_dir`` parameter: gateway scans the directory for ``*.yaml`` / ``*.yml`` fragment files on every manifest load / reload and merges apps, components, and functions on top of the base manifest. Fragments are forbidden from declaring top-level ``areas``, ``metadata``, ``discovery``, ``scripts``, ``capabilities``, or ``lock_overrides`` - those stay in the base manifest. Presence of any forbidden key (including empty-valued ones like ``areas: []``) is reported as a ``FRAGMENT_FORBIDDEN_FIELD`` validation error that fails the load / reload. Unknown top-level keys (typos such as ``app:`` vs ``apps:``) are ignored with a warning log. Files merged in alphabetical order for deterministic duplicate-id errors (`#376 `_) * Fragment files are size-capped at 1 MiB (``ManifestParser::kMaxFragmentBytes``) before being read into memory, and any symlink resolving outside the canonical ``fragments_dir`` is skipped with a warning, so misconfigurations or symlink-based escapes cannot hand arbitrary bytes to the YAML parser (`#376 `_) diff --git a/src/ros2_medkit_gateway/README.md b/src/ros2_medkit_gateway/README.md index ff478cab..db7203a1 100644 --- a/src/ros2_medkit_gateway/README.md +++ b/src/ros2_medkit_gateway/README.md @@ -764,6 +764,7 @@ Real-time fault event stream using Server-Sent Events (SSE). Clients receive ins - **Automatic reconnection**: Supports `Last-Event-ID` header for seamless reconnection - **Keepalive**: Sends `:keepalive` comment every 30 seconds to prevent timeouts - **Event buffer**: Buffers up to 100 recent events for reconnecting clients +- **Entity context (SOVD payload extension)**: When the gateway can resolve the fault's first reporting source back to an entity, the payload carries an `x-medkit` object with `entity_type` and `entity_id` fields so consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly without enumerating entities **Event Types:** - `fault_confirmed` - Fault transitioned to CONFIRMED status @@ -781,13 +782,15 @@ curl -N http://localhost:8080/api/v1/faults/stream id: 1 event: fault_confirmed -data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123} +data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}} id: 2 event: fault_cleared -data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456} +data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}} ``` +**SOVD payload extension `x-medkit.entity_*`** (non-standard, SOVD-compatible): Resolution is best-effort and snapshotted at event arrival. Manifest and hybrid discovery use the linking result; runtime-only discovery falls back to the FQN's last segment and only emits the fields when an App with that ID exists in the cache. When no entity can be resolved (no reporting sources, orphan source, etc.), the entire `x-medkit` object is omitted and the consumer must fall back to discovery. + **Response (503 Service Unavailable - Client Limit Reached):** ```json { diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp index 072bb7e6..ccf95100 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp @@ -20,8 +20,11 @@ #include #include #include +#include #include +#include + #include "rclcpp/rclcpp.hpp" #include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp" #include "ros2_medkit_gateway/http/handlers/handler_context.hpp" @@ -111,8 +114,33 @@ class SSEFaultHandler { /// Callback for fault events from ROS 2 topic void on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg); + /// Resolved owning entity for a fault. Populates the ``x-medkit`` SOVD + /// payload-extension object on outgoing SSE events. + struct EntityContext { + std::string type; + std::string id; + }; + + /// Buffered queue entry. ``entity`` is resolved at enqueue time so a + /// discovery refresh between enqueue and stream-out cannot retroactively + /// flip the entity reported to consumers. + struct QueuedEvent { + uint64_t id; + ros2_medkit_msgs::msg::FaultEvent event; + std::optional entity; + }; + /// Format a fault event as SSE message - static std::string format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id); + static std::string format_sse_event(const QueuedEvent & queued); + + /// Resolve the owning entity for a fault, snapshotting the cache. Manifest + /// / hybrid mode uses the cache's node-to-app index; runtime mode falls + /// back to the FQN's last segment, and for collision-disambiguated runtime + /// apps also to ``_`` per + /// ros2_runtime_introspection.cpp's renaming rule. Only emits a value when + /// an App with the resolved id actually exists in the cache - returns + /// ``std::nullopt`` otherwise so the consumer falls back to discovery. + std::optional resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const; HandlerContext & ctx_; std::shared_ptr client_tracker_; @@ -123,7 +151,7 @@ class SSEFaultHandler { /// Event queue for broadcasting to clients mutable std::mutex queue_mutex_; std::condition_variable queue_cv_; - std::deque> event_queue_; + std::deque event_queue_; /// Monotonically increasing event ID for Last-Event-ID support std::atomic next_event_id_{1}; diff --git a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp index ec724bef..e4a8df61 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp @@ -86,12 +86,17 @@ void SSEFaultHandler::request_shutdown() { void SSEFaultHandler::on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg) { uint64_t event_id = next_event_id_.fetch_add(1); + // Snapshot entity context before acquiring the queue lock so cache state + // is pinned to the fault arrival timestamp and the formatting path stays + // lock-free with respect to the cache. + auto entity = resolve_entity_context(msg->fault); + std::size_t dropped_this_call = 0; { std::lock_guard lock(queue_mutex_); - // Add event to queue - event_queue_.emplace_back(event_id, *msg); + // Add event to queue with resolved entity context + event_queue_.push_back(QueuedEvent{event_id, *msg, std::move(entity)}); // Trim old events if buffer is full while (event_queue_.size() > kMaxBufferedEvents) { @@ -156,13 +161,13 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo // First, send any buffered events the client missed (for reconnection) { std::lock_guard lock(queue_mutex_); - for (const auto & [id, event] : event_queue_) { - if (id > last_event_id) { - std::string sse_msg = format_sse_event(event, id); + for (const auto & queued : event_queue_) { + if (queued.id > last_event_id) { + std::string sse_msg = format_sse_event(queued); if (!sink.write(sse_msg.data(), sse_msg.size())) { return false; // Client disconnected } - last_event_id = id; + last_event_id = queued.id; } } } @@ -198,15 +203,15 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo // Check for new events bool found_new = false; - for (const auto & [id, event] : event_queue_) { - if (id > last_event_id) { - std::string sse_msg = format_sse_event(event, id); + for (const auto & queued : event_queue_) { + if (queued.id > last_event_id) { + std::string sse_msg = format_sse_event(queued); lock.unlock(); if (!sink.write(sse_msg.data(), sse_msg.size())) { return false; // Client disconnected } lock.lock(); - last_event_id = id; + last_event_id = queued.id; found_new = true; } } @@ -230,24 +235,89 @@ size_t SSEFaultHandler::connected_clients() const { return client_tracker_->connected_clients(); } -std::string SSEFaultHandler::format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id) { - const auto sanitized_event_type = sanitize_sse_event_type(event.event_type); +std::string SSEFaultHandler::format_sse_event(const QueuedEvent & queued) { + const auto sanitized_event_type = sanitize_sse_event_type(queued.event.event_type); nlohmann::json json_event; json_event["event_type"] = sanitized_event_type; - json_event["fault"] = ros2::conversions::fault_to_json(event.fault); + json_event["fault"] = ros2::conversions::fault_to_json(queued.event.fault); // Convert timestamp to seconds with nanosecond precision - double timestamp_sec = static_cast(event.timestamp.sec) + static_cast(event.timestamp.nanosec) * 1e-9; + double timestamp_sec = + static_cast(queued.event.timestamp.sec) + static_cast(queued.event.timestamp.nanosec) * 1e-9; json_event["timestamp"] = timestamp_sec; + // SOVD payload extension: nest ``entity_type`` / ``entity_id`` under the + // ``x-medkit`` response-extension object so global-stream consumers can + // hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` + // directly instead of HEAD-probing every entity. Flat ``x-medkit-*`` + // names are reserved for endpoint paths (``/x-medkit-graph``) and error + // codes, not payload fields. + if (queued.entity) { + json_event["x-medkit"] = {{"entity_type", queued.entity->type}, {"entity_id", queued.entity->id}}; + } + std::ostringstream sse; - sse << "id: " << event_id << "\n"; + sse << "id: " << queued.id << "\n"; sse << "event: " << sanitized_event_type << "\n"; sse << "data: " << json_event.dump() << "\n\n"; return sse.str(); } +std::optional +SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const { + if (fault.reporting_sources.empty()) { + return std::nullopt; + } + const auto & raw_fqn = fault.reporting_sources.front(); + if (raw_fqn.empty()) { + return std::nullopt; + } + + const auto & cache = ctx_.node()->get_thread_safe_cache(); + + // Manifest / hybrid mode: the linking step populated node_to_app with the + // ROS FQN -> manifest app id mapping. Try both FQN forms (with and without + // the leading '/'), mirroring gateway_node's node_resolver lambda. + std::string entity_id = cache.resolve_node_to_app(raw_fqn); + if (entity_id.empty() && raw_fqn.front() == '/') { + entity_id = cache.resolve_node_to_app(raw_fqn.substr(1)); + } + + // Runtime fallback: synthetic apps are created with id = ROS node name + // (the FQN's last segment) when there is no namespace collision, or + // ``_`` (slashes in the namespace replaced with '_') when + // multiple nodes share the same name. See ros2_runtime_introspection.cpp. + // Only accept a candidate that actually exists as an App in the cache so + // we never point consumers at a 404. + if (entity_id.empty()) { + auto last_slash = raw_fqn.rfind('/'); + auto name = (last_slash != std::string::npos) ? raw_fqn.substr(last_slash + 1) : raw_fqn; + if (!name.empty() && cache.has_app(name)) { + entity_id = std::move(name); + } else if (last_slash != std::string::npos && last_slash > 0) { + // Try the collision-disambiguated form: ns prefix (sans leading '/'), + // slashes replaced with '_', then '_' + name. + auto ns_prefix = raw_fqn.substr(1, last_slash - 1); + std::replace(ns_prefix.begin(), ns_prefix.end(), '/', '_'); + auto namespaced = ns_prefix + "_" + name; + if (cache.has_app(namespaced)) { + entity_id = std::move(namespaced); + } + } + } + + if (entity_id.empty()) { + RCLCPP_DEBUG(HandlerContext::logger(), + "SSE fault event: no entity match for reporting source '%s' (fault_code='%s'); " + "omitting x-medkit extension", + raw_fqn.c_str(), fault.fault_code.c_str()); + return std::nullopt; + } + + return EntityContext{"apps", std::move(entity_id)}; +} + } // namespace handlers } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp index c840651b..2d0602bf 100644 --- a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp @@ -28,7 +28,9 @@ #include #include "ros2_medkit_gateway/core/config.hpp" +#include "ros2_medkit_gateway/core/discovery/models/app.hpp" #include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp" +#include "ros2_medkit_gateway/core/models/thread_safe_entity_cache.hpp" #include "ros2_medkit_gateway/fault_manager_paths.hpp" #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/http/handlers/handler_context.hpp" @@ -37,10 +39,12 @@ using json = nlohmann::json; using namespace std::chrono_literals; +using ros2_medkit_gateway::App; using ros2_medkit_gateway::AuthConfig; using ros2_medkit_gateway::CorsConfig; using ros2_medkit_gateway::GatewayNode; using ros2_medkit_gateway::SSEClientTracker; +using ros2_medkit_gateway::ThreadSafeEntityCache; using ros2_medkit_gateway::TlsConfig; using ros2_medkit_gateway::handlers::HandlerContext; using ros2_medkit_gateway::handlers::SSEFaultHandler; @@ -415,6 +419,199 @@ TEST_F(SSEFaultHandlerTest, NonPositiveKeepaliveOverrideLogsWarning) { EXPECT_NE(logs.find("Non-positive SSE keepalive override"), std::string::npos); } +TEST_F(SSEFaultHandlerTest, StreamOmitsXMedkitWhenNoMatchingApp) { + // Empty cache: reporting source ("/apps/temp_sensor") has no manifest mapping + // and no App with id "temp_sensor" exists in runtime cache, so the + // ``x-medkit`` payload extension is not emitted (consumer falls back to + // discovery). + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "NO_OWNER", 10)); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + EXPECT_FALSE(payload.contains("x-medkit")); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamEmitsXMedkitForRuntimeApp) { + // Runtime_only fallback: synthetic App with id matching the reporting + // source's last segment exists in the cache, so the ``x-medkit`` payload + // extension points consumers straight at + // /apps/temp_sensor/bulk-data/rosbags/. + App app; + app.id = "temp_sensor"; + app.name = "temp_sensor"; + app.source = "heuristic"; + app.bound_fqn = "/apps/temp_sensor"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "WITH_OWNER", 20)); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "temp_sensor"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamEmitsXMedkitForManifestApp) { + // Manifest/hybrid mode: linking populates node_to_app with the ROS FQN -> + // manifest app id mapping. The fault's reporting source uses the ROS FQN + // form (with leading slash), but the manifest app id can be arbitrary (here + // "diagnostic-bridge" with a hyphen, to model the FQN/id naming mismatch + // called out in #380). + App app; + app.id = "diagnostic-bridge"; + app.name = "diagnostic_bridge"; + app.source = "manifest"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + std::unordered_map node_to_app{{"/bridge/diagnostic_bridge", "diagnostic-bridge"}}; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_all({}, {}, {app}, {}, node_to_app); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "BRIDGE_FAULT", 30); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "diagnostic-bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamSnapshotsEntityContextAtEnqueue) { + // The core design claim of #380: entity resolution is pinned at enqueue + // time so a discovery refresh between enqueue and stream replay cannot + // flip the entity reported to consumers. Enqueue with a matching App in + // the cache, then mutate the cache to remove it, then replay - the + // x-medkit object must still reflect the snapshot. + App app; + app.id = "temp_sensor"; + app.name = "temp_sensor"; + app.source = "heuristic"; + app.bound_fqn = "/apps/temp_sensor"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "SNAPSHOTTED", 50)); + + // Discovery refresh: remove the App. Without snapshot-at-enqueue, a + // replay would now see an empty cache and omit x-medkit. + cache.update_apps({}); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "temp_sensor"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamResolvesManifestFqnWithoutLeadingSlash) { + // gateway_node's node_resolver tries both FQN forms; the linking layer may + // store keys without the leading '/' (e.g. when the manifest declares + // namespace-only FQNs). The handler must try the slash-stripped form so + // /bridge/diagnostic_bridge resolves against a node_to_app key of + // "bridge/diagnostic_bridge". + App app; + app.id = "diagnostic-bridge"; + app.name = "diagnostic_bridge"; + app.source = "manifest"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + std::unordered_map node_to_app{{"bridge/diagnostic_bridge", "diagnostic-bridge"}}; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_all({}, {}, {app}, {}, node_to_app); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "SLASH_STRIP", 60); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "diagnostic-bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamResolvesRuntimeCollisionRenamedApp) { + // Runtime discovery renames collision-prone nodes to _ + // (see ros2_runtime_introspection.cpp). The fallback must also try this + // form so faults from those apps still get a usable x-medkit context. + App app; + app.id = "bridge_diagnostic_bridge"; + app.name = "diagnostic_bridge"; + app.source = "heuristic"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "COLLISION", 70); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "bridge_diagnostic_bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamOmitsXMedkitWhenReportingSourcesEmpty) { + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "ORPHAN", 40); + event.fault.reporting_sources.clear(); + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + EXPECT_FALSE(payload.contains("x-medkit")); + + release_stream(res); +} + TEST_F(SSEFaultHandlerTest, DisconnectReleasesTrackedClientSlot) { auto req = make_stream_request("127.0.0.1"); httplib::Response res; From f25c9dfa3ea0594190f709dc3fbab45e39f53e66 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 26 May 2026 12:18:14 +0200 Subject: [PATCH 2/5] test,docs(gateway): document SSE x-medkit entity_type and isolate tests Address feedback on #400: - Comment that reporting_sources.front() picks the lexicographically-first reporter (set ordering), not a defined owner - any co-reporter's rosbag is still fetchable, but consumers should not assume ownership. - Comment that entity_type is hardcoded "apps" because apps are the leaf reporters; components own faults transitively. Manifest-only components without bound nodes have no match and fall back to discovery by design. - Add StreamPrefersManifestMatchOverRuntimeFallback test that pins the ordering contract: when both a manifest node_to_app mapping and a collision-matching runtime app exist for the same FQN, manifest wins. - Set per-test fault_manager.namespace so each TEST_F gets a unique events topic (/test/fault_manager/events). Eliminates the shared /fault_manager/events that previously forced serial execution. --- .../src/http/handlers/sse_fault_handler.cpp | 10 ++++ .../test/test_sse_fault_handler.cpp | 50 ++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp index e4a8df61..7c82d8f3 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp @@ -270,6 +270,10 @@ SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fau if (fault.reporting_sources.empty()) { return std::nullopt; } + // reporting_sources is a set; debounced faults can carry several co-reporters + // (e.g. node_a and node_b raising the same fault_code). .front() picks the + // lexicographically-first FQN, not a defined owner - any co-reporter's + // rosbag is fetchable, so this remains a valid hint, just not authoritative. const auto & raw_fqn = fault.reporting_sources.front(); if (raw_fqn.empty()) { return std::nullopt; @@ -316,6 +320,12 @@ SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fau return std::nullopt; } + // entity_type is hardcoded "apps" because apps are the leaf reporters in + // SOVD - reporting_sources always carries ROS node FQNs which map to apps. + // Components own faults transitively via their hosted apps; consumers can + // walk up the hierarchy via /apps/ -> belongs_to if they need the + // owning component. Manifest-only components without a bound node have no + // FQN match here and fall back to plain discovery - by design. return EntityContext{"apps", std::move(entity_id)}; } diff --git a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp index 2d0602bf..85b3b3e9 100644 --- a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp @@ -170,8 +170,15 @@ class SSEFaultHandlerTest : public ::testing::Test { int server_port = reserve_local_port(); ASSERT_NE(server_port, 0); + // Per-test fault_manager namespace gives every TEST_F a unique events + // topic (/test/fault_manager/events). Without this, all tests share + // /fault_manager/events, so a stale publisher/subscription in the same + // DDS domain could leak messages across tests when the suite is run + // alongside other gateway test binaries. + const int test_id = test_counter_++; auto options = rclcpp::NodeOptions{}.automatically_declare_parameters_from_overrides(false).parameter_overrides({ {"server.port", server_port}, + {"fault_manager.namespace", "test" + std::to_string(test_id)}, }); node_ = std::make_shared(options); @@ -184,8 +191,7 @@ class SSEFaultHandlerTest : public ::testing::Test { tracker_ = std::make_shared(4); handler_ = std::make_unique(*ctx_, tracker_); - publisher_node_ = - std::make_shared("test_sse_fault_handler_publisher_" + std::to_string(test_counter_++)); + publisher_node_ = std::make_shared("test_sse_fault_handler_publisher_" + std::to_string(test_id)); fault_events_topic_ = ros2_medkit_gateway::build_fault_manager_events_topic(node_.get()); publisher_ = publisher_node_->create_publisher(fault_events_topic_, rclcpp::QoS(100).reliable()); @@ -500,6 +506,46 @@ TEST_F(SSEFaultHandlerTest, StreamEmitsXMedkitForManifestApp) { release_stream(res); } +TEST_F(SSEFaultHandlerTest, StreamPrefersManifestMatchOverRuntimeFallback) { + // Manifest match must win over the runtime heuristic when both could apply + // to the same FQN. Seed both: (1) a manifest node_to_app entry mapping + // "/bridge/diagnostic_bridge" -> "manifest-bridge", and (2) a runtime-style + // App whose id matches the FQN's last segment ("diagnostic_bridge"). The + // resolver tries manifest first - if anyone ever flips the ordering, the + // x-medkit emits the runtime id and this test catches it. + App manifest_app; + manifest_app.id = "manifest-bridge"; + manifest_app.name = "diagnostic_bridge"; + manifest_app.source = "manifest"; + manifest_app.bound_fqn = "/bridge/diagnostic_bridge"; + + App runtime_app; + runtime_app.id = "diagnostic_bridge"; + runtime_app.name = "diagnostic_bridge"; + runtime_app.source = "heuristic"; + runtime_app.bound_fqn = "/bridge/diagnostic_bridge"; + + std::unordered_map node_to_app{{"/bridge/diagnostic_bridge", "manifest-bridge"}}; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_all({}, {}, {manifest_app, runtime_app}, {}, node_to_app); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "PRECEDENCE", 35); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "manifest-bridge"); + + release_stream(res); +} + TEST_F(SSEFaultHandlerTest, StreamSnapshotsEntityContextAtEnqueue) { // The core design claim of #380: entity resolution is pinned at enqueue // time so a discovery refresh between enqueue and stream replay cannot From 3447220b44979547638081cf30884cbc8cc04a78 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 26 May 2026 12:56:47 +0200 Subject: [PATCH 3/5] test(integration): widen TSan shutdown windows for multi-gateway tests The four multi-gateway aggregation tests bypassed create_gateway_node() to build their own launch_ros.actions.Node instances and inherited the launch default sigterm_timeout=5/sigkill_timeout=5. Under TSan the gateway teardown sequence (mdns stop, REST server stop, transport shutdown, plugin shutdown) routinely exceeds 5s, so launch escalates SIGINT to SIGTERM and then to SIGKILL, leaving the process with exit -9 which is not in ALLOWED_EXIT_CODES. test_leaf_collision_aggregation tripped on this today; the other three are latent. Apply the same 30s/15s windows as create_gateway_node() so all four test families survive sanitizer slowdowns. --- .../test/features/test_cross_ecu_fanout.test.py | 5 +++++ .../test/features/test_daisy_chain_aggregation.test.py | 6 ++++++ .../test/features/test_leaf_collision_aggregation.test.py | 6 ++++++ .../test/features/test_peer_aggregation.test.py | 5 +++++ 4 files changed, 22 insertions(+) diff --git a/src/ros2_medkit_integration_tests/test/features/test_cross_ecu_fanout.test.py b/src/ros2_medkit_integration_tests/test/features/test_cross_ecu_fanout.test.py index 7d062136..2b809566 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_cross_ecu_fanout.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_cross_ecu_fanout.test.py @@ -198,6 +198,11 @@ def generate_test_description(): 'discovery.manifest_strict_validation': False, }], additional_env=peer_domain_env, + # Match create_gateway_node()'s TSan/ASan-safe shutdown windows. + # Default 5s SIGINT->SIGTERM escalation is insufficient under + # sanitizers and causes SIGKILL with exit -9. + sigterm_timeout='30', + sigkill_timeout='15', ) primary_fault_mgr = launch_ros.actions.Node( diff --git a/src/ros2_medkit_integration_tests/test/features/test_daisy_chain_aggregation.test.py b/src/ros2_medkit_integration_tests/test/features/test_daisy_chain_aggregation.test.py index 5fb6445b..8c5c950b 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_daisy_chain_aggregation.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_daisy_chain_aggregation.test.py @@ -148,6 +148,12 @@ def _gateway(port, name, manifest_path, domain, aggregation_peers=None): output='screen', parameters=[params], additional_env={'ROS_DOMAIN_ID': str(domain)}, + # Match create_gateway_node()'s TSan/ASan-safe shutdown windows. + # Under sanitizers the gateway teardown sequence (mdns stop, REST + # server stop, transport shutdown, plugin shutdown) routinely + # exceeds launch's 5s default, causing SIGKILL with exit -9. + sigterm_timeout='30', + sigkill_timeout='15', ) diff --git a/src/ros2_medkit_integration_tests/test/features/test_leaf_collision_aggregation.test.py b/src/ros2_medkit_integration_tests/test/features/test_leaf_collision_aggregation.test.py index 6ed2389e..2b9e9288 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_leaf_collision_aggregation.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_leaf_collision_aggregation.test.py @@ -138,6 +138,12 @@ def _gateway(port, name, manifest_path, domain, aggregation_peers=None): output='screen', parameters=[params], additional_env={'ROS_DOMAIN_ID': str(domain)}, + # Match create_gateway_node()'s TSan/ASan-safe shutdown windows. + # Under sanitizers the gateway teardown sequence (mdns stop, REST + # server stop, transport shutdown, plugin shutdown) routinely + # exceeds launch's 5s default, causing SIGKILL with exit -9. + sigterm_timeout='30', + sigkill_timeout='15', ) diff --git a/src/ros2_medkit_integration_tests/test/features/test_peer_aggregation.test.py b/src/ros2_medkit_integration_tests/test/features/test_peer_aggregation.test.py index 2ff6fde8..6254bdf8 100644 --- a/src/ros2_medkit_integration_tests/test/features/test_peer_aggregation.test.py +++ b/src/ros2_medkit_integration_tests/test/features/test_peer_aggregation.test.py @@ -104,6 +104,11 @@ def generate_test_description(): 'server.port': PEER_PORT, }], additional_env=peer_domain_env, + # Match create_gateway_node()'s TSan/ASan-safe shutdown windows. + # Default 5s SIGINT->SIGTERM escalation is insufficient under + # sanitizers and causes SIGKILL with exit -9. + sigterm_timeout='30', + sigkill_timeout='15', ) # Demo nodes: each set runs in its gateway's DDS domain. From ee6622b384bc9772f2dba225e5732674c95bcd80 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 26 May 2026 15:17:48 +0200 Subject: [PATCH 4/5] ci: re-trigger workflows after GH Actions outage From 301412817c63ccc3af6d6fa808095852f43b8fd7 Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Tue, 26 May 2026 15:50:01 +0200 Subject: [PATCH 5/5] fix(gateway): tighten ~GatewayNode graph-event teardown for TSan TSan flagged a Write/Read race on the rclcpp::Event control block between the per-context graph listener thread (calling NodeGraph::notify_graph_change) and our automatic member destruction in ~GatewayNode. Reproduced today in test_gateway_node where many GatewayNode instances are created and destroyed in sequence while the per-process GraphListener keeps running. Two-pronged fix: - Explicitly cancel and reset graph_check_timer_, backstop_timer_ and graph_event_ at the end of the ~GatewayNode body, before any other member runs its destructor. This drains the timer (its [this] lambda reads graph_event_) and drops our shared_ptr while we still control the ordering, instead of relying on declaration-order destruction. - Add a TSan suppression for the residual library-side window inside rclcpp::node_interfaces::NodeGraph::notify_graph_change, matching the existing approach for other rclcpp-internal races (signal handler, logging, Context shutdown). We do not own that code path; the fix above closes our half of the race. --- src/ros2_medkit_gateway/src/gateway_node.cpp | 21 +++++++++++++++++++- tsan_suppressions.txt | 10 ++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/ros2_medkit_gateway/src/gateway_node.cpp b/src/ros2_medkit_gateway/src/gateway_node.cpp index f114c50d..0635066c 100644 --- a/src/ros2_medkit_gateway/src/gateway_node.cpp +++ b/src/ros2_medkit_gateway/src/gateway_node.cpp @@ -1537,7 +1537,26 @@ GatewayNode::~GatewayNode() { if (config_mgr_) { config_mgr_->shutdown(); } - // 9. Normal member destruction (managers safe - all transports stopped) + // 9. Cancel the graph timers and drop our graph_event_ reference before + // any other member destruction begins. rclcpp's graph listener thread + // holds shared_ptrs to all registered graph events (graph_events_ in + // NodeGraph) and may concurrently iterate them via notify_graph_change() + // while we're tearing down. TSan flagged a Write/Read race on the + // Event's control block between that thread and our shared_ptr release + // during automatic member destruction (see TestGatewayNode TSan run). + // Doing the reset here gives the timer a chance to drain before its + // [this]-captured graph_event_ disappears, and shrinks the window where + // the graph listener can still see a live shared_ptr from us. + if (graph_check_timer_) { + graph_check_timer_->cancel(); + graph_check_timer_.reset(); + } + if (backstop_timer_) { + backstop_timer_->cancel(); + backstop_timer_.reset(); + } + graph_event_.reset(); + // 10. Normal member destruction (managers safe - all transports stopped) } const ThreadSafeEntityCache & GatewayNode::get_thread_safe_cache() const { diff --git a/tsan_suppressions.txt b/tsan_suppressions.txt index 6f699c32..09fdfad1 100644 --- a/tsan_suppressions.txt +++ b/tsan_suppressions.txt @@ -41,6 +41,16 @@ called_from_lib:libcpp-httplib.so # test teardown when executor threads race with node destruction. mutex:__gthread_mutex_unlock +# rclcpp: NodeGraph::notify_graph_change() iterates the per-context vector +# of graph events (held as Event::WeakPtr) from the graph listener thread +# while a node is being destroyed in another thread, racing on the Event's +# shared_ptr control block. Triggered by TestGatewayNode where many nodes +# are created/destroyed in sequence while the per-process GraphListener +# keeps running. We already reset graph_event_ before automatic member +# destruction (see ~GatewayNode); this suppression covers the remaining +# library-side window we cannot tighten without changing rclcpp. +race:rclcpp::node_interfaces::NodeGraph::notify_graph_change + # rclcpp: lock-order-inversion in Context::add_on_shutdown_callback # during test TearDownTestSuite when rclcpp::shutdown() races with