diff --git a/atos/CMakeLists.txt b/atos/CMakeLists.txt index 2d83dba86..d9a311d25 100644 --- a/atos/CMakeLists.txt +++ b/atos/CMakeLists.txt @@ -66,6 +66,7 @@ set(WITH_BACK_TO_START ON CACHE BOOL "Enable BackToStart module") set(WITH_OPEN_SCENARIO_GATEWAY ON CACHE BOOL "Enable OpenScenario Gateway module") set(WITH_REST_BRIDGE ON CACHE BOOL "Enable RESTBridge module") set(WITH_MONR_RELAY ON CACHE BOOL "Enable MonrRelay module") +set(WITH_TRUCK_OBJECT_CONTROL OFF CACHE BOOL "Enable TruckObjectControl module") set(ENABLE_TESTS ON CACHE BOOL "Enable testing on build") @@ -106,6 +107,9 @@ endif() if(WITH_MONR_RELAY) list(APPEND ENABLED_MODULES MonrRelay) endif() +if(WITH_TRUCK_OBJECT_CONTROL) + list(APPEND ENABLED_MODULES TruckObjectControl) +endif() # Add corresponding subprojects diff --git a/atos/common/roschannels/monitorchannel.hpp b/atos/common/roschannels/monitorchannel.hpp index 216ab0989..e8918b134 100644 --- a/atos/common/roschannels/monitorchannel.hpp +++ b/atos/common/roschannels/monitorchannel.hpp @@ -8,6 +8,7 @@ #include "atos_interfaces/msg/monitor.hpp" #include "positioning.h" #include "roschannel.hpp" +#include #include #if ROS_FOXY #include "tf2_geometry_msgs/tf2_geometry_msgs.h" @@ -97,7 +98,10 @@ inline message_type fromISOMonr(const uint32_t id, if (indata.position.isHeadingValid) { tf2::Quaternion orientation; orientation.setRPY(0, 0, indata.position.heading_rad); - outdata.pose.pose.orientation = tf2::toMsg(orientation); + outdata.pose.pose.orientation.x = orientation.x(); + outdata.pose.pose.orientation.y = orientation.y(); + outdata.pose.pose.orientation.z = orientation.z(); + outdata.pose.pose.orientation.w = orientation.w(); } outdata.velocity.twist.linear.x = indata.speed.isLongitudinalValid ? indata.speed.longitudinal_m_s : 0; outdata.velocity.twist.linear.y = indata.speed.isLateralValid ? indata.speed.lateral_m_s : 0; @@ -128,20 +132,19 @@ inline message_type fromISOMonr(const uint32_t id, inline ObjectMonitorType toISOMonr(message_type& indata) { ObjectMonitorType outdata; - outdata.timestamp.tv_sec = indata.atos_header.header.stamp.sec; - outdata.timestamp.tv_usec = indata.atos_header.header.stamp.nanosec / 1000; - outdata.state = static_cast(indata.object_state.state); - outdata.position.isPositionValid = true; - outdata.position.isXcoordValid = true; - outdata.position.isYcoordValid = true; - outdata.position.isZcoordValid = true; - outdata.position.xCoord_m = indata.pose.pose.position.x; - outdata.position.yCoord_m = indata.pose.pose.position.y; - outdata.position.zCoord_m = indata.pose.pose.position.z; - outdata.position.isHeadingValid = true; - tf2::Quaternion quat_tf; + outdata.timestamp.tv_sec = indata.atos_header.header.stamp.sec; + outdata.timestamp.tv_usec = indata.atos_header.header.stamp.nanosec / 1000; + outdata.state = static_cast(indata.object_state.state); + outdata.position.isPositionValid = true; + outdata.position.isXcoordValid = true; + outdata.position.isYcoordValid = true; + outdata.position.isZcoordValid = true; + outdata.position.xCoord_m = indata.pose.pose.position.x; + outdata.position.yCoord_m = indata.pose.pose.position.y; + outdata.position.zCoord_m = indata.pose.pose.position.z; + outdata.position.isHeadingValid = true; geometry_msgs::msg::Quaternion quat_msg = indata.pose.pose.orientation; - tf2::fromMsg(quat_msg, quat_tf); + tf2::Quaternion quat_tf(quat_msg.x, quat_msg.y, quat_msg.z, quat_msg.w); double r{}, p{}, y{}; tf2::Matrix3x3 m(quat_tf); m.getRPY(r, p, y); diff --git a/atos/common/trajectory.cpp b/atos/common/trajectory.cpp index 6ab59d478..c13ea179e 100644 --- a/atos/common/trajectory.cpp +++ b/atos/common/trajectory.cpp @@ -11,6 +11,8 @@ #include #include #include +#include +#include #if ROS_FOXY #include "tf2_geometry_msgs/tf2_geometry_msgs.h" @@ -62,7 +64,10 @@ atos_interfaces::msg::CartesianTrajectory Trajectory::toCartesianTrajectory() { // Rotation tf2::Quaternion q; q.setRPY(0, 0, point.getHeading()); - tf2::convert(q, pointMsg.pose.orientation); + pointMsg.pose.orientation.x = q.x(); + pointMsg.pose.orientation.y = q.y(); + pointMsg.pose.orientation.z = q.z(); + pointMsg.pose.orientation.w = q.w(); // Velocity TODO convert longitudinal / lateral into xyz coordinate system pointMsg.twist.linear.x = point.getLongitudinalVelocity(); @@ -82,8 +87,8 @@ atos_interfaces::msg::CartesianTrajectory Trajectory::toCartesianTrajectory() { nav_msgs::msg::Path Trajectory::toPath() const { nav_msgs::msg::Path path; path.header.frame_id = "map"; - path.header.stamp = rclcpp::Time(0); auto rosTimeOffset = rclcpp::Time(std::chrono::system_clock::now().time_since_epoch().count()); + path.header.stamp = rosTimeOffset; for (const auto& point : this->points) { geometry_msgs::msg::PoseStamped pose; pose.header.stamp = rosTimeOffset + rclcpp::Duration(point.getTime()); @@ -92,7 +97,10 @@ nav_msgs::msg::Path Trajectory::toPath() const { pose.pose.position.z = point.getPosition().z(); tf2::Quaternion q; q.setRPY(0, 0, point.getHeading()); - tf2::convert(q, pose.pose.orientation); + pose.pose.orientation.x = q.x(); + pose.pose.orientation.y = q.y(); + pose.pose.orientation.z = q.z(); + pose.pose.orientation.w = q.w(); path.poses.push_back(pose); } // Force same coordinate frame as header diff --git a/atos/common/trajectory.hpp b/atos/common/trajectory.hpp index b366ce74e..ddae5a56f 100644 --- a/atos/common/trajectory.hpp +++ b/atos/common/trajectory.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include diff --git a/atos/common/util.c b/atos/common/util.c index 07362761f..761ce8f6e 100644 --- a/atos/common/util.c +++ b/atos/common/util.c @@ -251,7 +251,8 @@ static int recursiveMkdir(const char *dir, int mode) { *p = '/'; } res = mkdir(tmp, mode); - } + return res; +} /*! * \brief deleteFile Deletes the file given in the parameter ::path @@ -1069,7 +1070,7 @@ int UtilSetSyncPoint(ObjectPosition * OP, double x, double y, double z, double T } } - + return OP->SyncIndex; } float UtilCalculateTimeToSync(ObjectPosition * OP) { @@ -1221,6 +1222,7 @@ int UtilSortSpaceTimeAscending(ObjectPosition * OP) { } } } + return 0; } @@ -1235,7 +1237,7 @@ int UtilFindCurrentTrajectoryPosition(ObjectPosition * OP, int StartIndex, doubl //OP->BestFoundTrajectoryIndex = 0; if (debug) fprintf(stderr, "UtilFindCurrentTrajectoryPosition: StartIndex=%d, CurrentTime=%4.3f, MaxTrajDiff=%4.3f, MaxTimeDiff=%4.3f\n", - OP->OrigoDistance, OP->x, OP->y, OP->SyncIndex); + StartIndex, CurrentTime, MaxTrajDiff, MaxTimeDiff); Init = 1; while (i < (OP->TrajectoryPositionCount - 1) && i <= OP->SyncIndex) { @@ -1328,7 +1330,7 @@ int UtilFindCurrentTrajectoryPositionNew(ObjectPosition * OP, int StartIndex, do if (i <= -1) i = 0; OP->BestFoundTrajectoryIndex = 0; - fprintf("UtilFindCurrentTrajectoryPositionNew: TrajectoryPositionCount=%d, SyncIndex=%d, OrigoDistance=%4.3f, x=%4.3f, y=%4.3f, SyncIndex=%d\n", + fprintf(stderr, "UtilFindCurrentTrajectoryPositionNew: TrajectoryPositionCount=%d, SyncIndex=%d, OrigoDistance=%4.3f, x=%4.3f, y=%4.3f, SyncIndex=%d\n", OP->TrajectoryPositionCount, OP->SyncIndex, OP->OrigoDistance, OP->x, OP->y, OP->SyncIndex); Init = 1; @@ -1378,7 +1380,7 @@ int UtilFindCurrentTrajectoryPositionNew(ObjectPosition * OP, int StartIndex, do //SampledSpaceIndex[j] = i; //j++ ; if (debug == 2) - fprintf("Minimum: %d, %3.6f, %3.6f", i, AngleDiff, RDiff); + fprintf(stderr, "Minimum: %d, %3.6f, %3.6f", i, AngleDiff, RDiff); PrevAngleDiff = AngleDiff; } @@ -1754,7 +1756,7 @@ int UtilGetRowInFile(const char *path, const size_t pathLength, return -1; } if(rowIndex == i){ - *(rowBuffer+length) = NULL; + *(rowBuffer + length) = '\0'; return i; } } diff --git a/atos/modules/EsminiAdapter/src/esminiadapter.cpp b/atos/modules/EsminiAdapter/src/esminiadapter.cpp index cfa37f09d..e3479ed36 100644 --- a/atos/modules/EsminiAdapter/src/esminiadapter.cpp +++ b/atos/modules/EsminiAdapter/src/esminiadapter.cpp @@ -221,8 +221,12 @@ void EsminiAdapter::reportObjectPosition(const Monitor::message_type::SharedPtr auto speed = monr->velocity.twist.linear; // Reporting to Esmini - int timestamp = 0; // Not really used according to esmini documentation +#if ROS_HUMBLE + float timestamp = 0.0f; // Not really used according to esmini documentation SE_ReportObjectPos(esminiObjectId, timestamp, pos.x, pos.y, pos.z, yaw, pitch, roll); +#else + SE_ReportObjectPos(esminiObjectId, pos.x, pos.y, pos.z, yaw, pitch, roll); +#endif SE_ReportObjectSpeed(esminiObjectId, speed.x); } diff --git a/atos/modules/IntegrationTesting/src/integrationtesting.cpp b/atos/modules/IntegrationTesting/src/integrationtesting.cpp index 8e55c2e28..f6482c4e8 100644 --- a/atos/modules/IntegrationTesting/src/integrationtesting.cpp +++ b/atos/modules/IntegrationTesting/src/integrationtesting.cpp @@ -38,7 +38,10 @@ IntegrationTesting::~IntegrationTesting() {} */ int IntegrationTesting::getObjectControlState() { std::shared_ptr response; - this->callService(1000ms, getObjectControlStateClient, response); + if (!this->callService(1000ms, getObjectControlStateClient, response) || response == nullptr) { + RCLCPP_ERROR(get_logger(), "Failed to call service %s", getObjectControlStateClient->get_service_name()); + return -1; + } return int(response->state); } diff --git a/atos/modules/ObjectControl/src/objectcontrol.cpp b/atos/modules/ObjectControl/src/objectcontrol.cpp index 2bf6fea29..27cd4a550 100644 --- a/atos/modules/ObjectControl/src/objectcontrol.cpp +++ b/atos/modules/ObjectControl/src/objectcontrol.cpp @@ -60,7 +60,7 @@ ObjectControl::ObjectControl(std::shared_ptr( - ServiceNames::getObjectIds, rmw_qos_profile_services_default, id_client_cb_group_); + ServiceNames::getObjectIds, rmw_qos_profile_services_default, id_client_cb_group_); originClient = create_client( ServiceNames::getTestOrigin, rmw_qos_profile_services_default, origin_client_cb_group_); trajectoryClient = create_client( @@ -295,17 +295,17 @@ bool ObjectControl::loadScenario() { std::future trajLoadedFuture = trajLoaded.get_future(); std::future ipLoadedFuture = ipLoaded.get_future(); std::future originLoadedFuture = originLoaded.get_future(); - if (auto status = trajLoadedFuture.wait_for(5s); + if (auto status = trajLoadedFuture.wait_for(20s); status != std::future_status::ready || !trajLoadedFuture.get()) { RCLCPP_ERROR(get_logger(), "Trajectory loading failed for object ID %u", id); successful = false; } - if (auto status = ipLoadedFuture.wait_for(250ms); + if (auto status = ipLoadedFuture.wait_for(2s); status != std::future_status::ready || !ipLoadedFuture.get()) { RCLCPP_ERROR(get_logger(), "IP loading failed for object ID %u", id); successful = false; } - if (auto status = originLoadedFuture.wait_for(250ms); + if (auto status = originLoadedFuture.wait_for(2s); status != std::future_status::ready || !originLoadedFuture.get()) { RCLCPP_ERROR(get_logger(), "Origin loading failed for object ID %u", id); successful = false; @@ -326,7 +326,7 @@ bool ObjectControl::loadScenario() { auto future = idClient->async_send_request(request, idsCallback); // Wait for all objects to load std::future scenarioLoadedFuture = scenarioLoaded.get_future(); - if (auto status = scenarioLoadedFuture.wait_for(10s); status == std::future_status::ready) { + if (auto status = scenarioLoadedFuture.wait_for(30s); status == std::future_status::ready) { return scenarioLoadedFuture.get(); // Get the status value } else if (status == std::future_status::timeout) { RCLCPP_ERROR(get_logger(), "Scenario loading timed out"); diff --git a/atos/modules/TruckObjectControl/CMakeLists.txt b/atos/modules/TruckObjectControl/CMakeLists.txt new file mode 100644 index 000000000..1da34fbbc --- /dev/null +++ b/atos/modules/TruckObjectControl/CMakeLists.txt @@ -0,0 +1,73 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +cmake_minimum_required(VERSION 3.8) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_C_STANDARD 11) +set(CMAKE_C_STANDARD_REQUIRED ON) + +project(truck_object_control) + +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) +find_package(ament_index_cpp REQUIRED) +find_package(OpenSSL REQUIRED) + +include(GNUInstallDirs) + +set(COREUTILS_LIBRARY ATOSCoreUtil) + +add_executable(${PROJECT_NAME} + ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/truckobjectcontrol.cpp +) + +target_include_directories(${PROJECT_NAME} PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/inc + ${CMAKE_CURRENT_SOURCE_DIR}/../../common +) +target_link_libraries(${PROJECT_NAME} + ${COREUTILS_LIBRARY} + OpenSSL::SSL + OpenSSL::Crypto +) + +ament_target_dependencies(${PROJECT_NAME} + rclcpp + std_msgs + ament_index_cpp +) + +add_executable(atos_truck_simulator + ${CMAKE_CURRENT_SOURCE_DIR}/src/main_simulator.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/atostrucksimulator.cpp +) +target_include_directories(atos_truck_simulator PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/inc + ${CMAKE_CURRENT_SOURCE_DIR}/../../common +) +target_link_libraries(atos_truck_simulator + ${COREUTILS_LIBRARY} +) +ament_target_dependencies(atos_truck_simulator + rclcpp + std_msgs + ament_index_cpp +) + +install(CODE "MESSAGE(STATUS \"Installing target ${PROJECT_NAME}\")") +install(TARGETS ${PROJECT_NAME} + RUNTIME DESTINATION "${CMAKE_INSTALL_LIBDIR}/atos" + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} +) +install(CODE "MESSAGE(STATUS \"Installing target atos_truck_simulator\")") +install(TARGETS atos_truck_simulator + RUNTIME DESTINATION "${CMAKE_INSTALL_LIBDIR}/atos" + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} +) diff --git a/atos/modules/TruckObjectControl/inc/atostrucksimulator.hpp b/atos/modules/TruckObjectControl/inc/atostrucksimulator.hpp new file mode 100644 index 000000000..d383981c7 --- /dev/null +++ b/atos/modules/TruckObjectControl/inc/atostrucksimulator.hpp @@ -0,0 +1,59 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#pragma once + +#include +#include + +#include +#include + +class AtosTruckSimulator : public rclcpp::Node { +public: + AtosTruckSimulator(); + +private: + struct GeoPoint { + double lat = 0.0; + double lon = 0.0; + double distance_m = 0.0; + }; + + std::string uid_ = "L5S-TRUCK-SIM"; + std::string tcp_host_ = "127.0.0.1"; + int tcp_port_ = 8114; + std::string trajectory_geojson_path_ = ""; + std::string trajectory_path_name_ = ""; + int start_index_ = 0; + double initial_speed_kmh_ = 0.0; + double target_speed_kmh_ = 40.0; + double acceleration_mps2_ = 2.0; + double publish_hz_ = 5.0; + bool loop_path_ = true; + bool ignore_warning_speed_commands_ = false; + + double current_distance_m_ = 0.0; + double current_speed_mps_ = 0.0; + + int tcp_fd_ = -1; + std::string tcp_rx_buffer_; + std::vector trajectory_path_; + + rclcpp::TimerBase::SharedPtr simulation_timer_; + rclcpp::Subscription::SharedPtr speed_command_sub_; + + rclcpp::Time last_step_time_; + + bool loadTrajectoryPath(); + bool ensureTcpConnected(); + void closeTcp(); + void pollTcpCommands(); + void simulationStep(); + bool pointAtDistance(double distance_m, double& lat, double& lon, double& course_deg, int& path_index) const; + static std::string utcIso8601FromRosTime(const rclcpp::Time& time); + void applySpeedCommandPayload(const std::string& payload); + void onSpeedCommand(const std_msgs::msg::String::SharedPtr msg); +}; diff --git a/atos/modules/TruckObjectControl/inc/truckobjectcontrol.hpp b/atos/modules/TruckObjectControl/inc/truckobjectcontrol.hpp new file mode 100644 index 000000000..43f1622a5 --- /dev/null +++ b/atos/modules/TruckObjectControl/inc/truckobjectcontrol.hpp @@ -0,0 +1,145 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class TruckObjectControl : public rclcpp::Node { +public: + TruckObjectControl(); + ~TruckObjectControl() override; + +private: + struct GeoPoint { + double lat = 0.0; + double lon = 0.0; + double distance_m = 0.0; + }; + + struct TruckState { + double distance_along_trajectory_m = 0.0; + double lat = 0.0; + double lon = 0.0; + double speed_mps = 0.0; + double course_deg = 0.0; + bool tcp_connected = false; + std::string path_name = ""; + int path_index = -1; + std::string last_control_command = ""; + std::string last_tcp_command = ""; + std::string last_tcp_warning = ""; + std::string last_cot_message = ""; + rclcpp::Time last_cot_stamp = rclcpp::Time(0, 0, RCL_ROS_TIME); + }; + + struct CotObservation { + std::string truck_id; + double distance_along_trajectory_m = 0.0; + double lat = 0.0; + double lon = 0.0; + double speed_mps = 0.0; + double course_deg = 0.0; + bool tcp_connected = false; + std::string path_name = ""; + int path_index = -1; + }; + + struct TcpClientSession { + int fd = -1; + std::string peer_name = ""; + SSL* ssl = nullptr; + std::mutex io_mutex; + std::mutex queue_mutex; + std::condition_variable queue_cv; + std::thread sender_thread; + std::string queued_uid = ""; + std::string queued_command = ""; + bool has_queued_command = false; + bool stop_sender = false; + }; + + rclcpp::Subscription::SharedPtr cot_sub_; + rclcpp::Publisher::SharedPtr speed_command_pub_; + rclcpp::Publisher::SharedPtr truck_state_pub_; + rclcpp::TimerBase::SharedPtr evaluation_timer_; + + std::mutex state_mutex_; + std::unordered_map trucks_; + std::vector trajectory_path_; + std::unordered_map> trajectory_cache_; + std::mutex trajectory_cache_mutex_; + + double warning_distance_m_ = 400.0; + double stop_distance_m_ = 200.0; + double warning_speed_kmh_ = 30.0; + double stop_speed_kmh_ = 0.0; + double cot_timeout_seconds_ = 2.0; + int cot_tcp_port_ = 8114; + std::string cot_tcp_bind_address_ = "0.0.0.0"; + bool cot_tls_require_client_cert_ = false; + std::string cot_tls_cert_path_ = ""; + std::string cot_tls_key_path_ = ""; + std::string cot_tls_ca_path_ = ""; + bool cot_tls_enabled_ = false; + std::string trajectory_geojson_path_ = ""; + std::string default_path_name_ = ""; + + std::atomic tcp_running_{false}; + int tcp_server_fd_ = -1; + std::thread tcp_accept_thread_; + std::vector tcp_client_threads_; + std::mutex tcp_threads_mutex_; + std::set tcp_client_fds_; + std::mutex tcp_command_mutex_; + std::mutex tcp_sessions_mutex_; + std::unordered_map> tcp_sessions_; + std::unordered_map uid_to_client_fd_; + std::unordered_map uid_to_ssl_; + SSL_CTX* ssl_ctx_ = nullptr; + uint64_t tcp_command_seq_ = 0; + + void onCotMessage(const std_msgs::msg::String::SharedPtr msg); + void evaluateAndPublishSpeedCommand(); + void publishTruckState(const std::string& truck_id, const TruckState& state); + + bool parseCotPlaceholder(const std::string& payload, CotObservation& out) const; + bool parseCotXml(const std::string& payload, CotObservation& out); + bool isCotFresh(const TruckState& state, const rclcpp::Time& now) const; + bool loadTrajectoryPath(); + bool loadTrajectoryPathFromFile(const std::string& path, std::vector& out) const; + std::string resolveTrajectoryPathByName(const std::string& path_name) const; + const std::vector* getTrajectoryForPath(const std::string& path_name); + double projectDistanceAlongTrajectory(double lat, + double lon, + const std::vector* trajectory = nullptr, + int* projected_path_index = nullptr) const; + + void startTcpServer(); + void stopTcpServer(); + void acceptTcpClients(); + void handleTcpClient(int client_fd, const std::string& peer_name); + void clientSenderLoop(const std::shared_ptr& session); + void disconnectClientSession(const std::shared_ptr& session, const std::string& reason); + void updateTruckTcpStatus(const std::string& target_id, + const std::string& command, + const std::string& warning, + bool mark_disconnected = false); + void sendSpeedCommandToTcpClient(const std::string& target_id, const std::string& command); + bool initializeTlsContext(); +}; diff --git a/atos/modules/TruckObjectControl/src/atostrucksimulator.cpp b/atos/modules/TruckObjectControl/src/atostrucksimulator.cpp new file mode 100644 index 000000000..9e93647b7 --- /dev/null +++ b/atos/modules/TruckObjectControl/src/atostrucksimulator.cpp @@ -0,0 +1,455 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#include "atostrucksimulator.hpp" + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using json = nlohmann::json; + +namespace { +constexpr double kEpsilon = 1e-9; +constexpr const char* kGeoJsonName = "RuralRoad_center_of_driving_lane_ccw.geojson"; + +double degToRad(double value) { + return value * M_PI / 180.0; +} + +double radToDeg(double value) { + return value * 180.0 / M_PI; +} + +double geodesicDistanceMeters(double lat1, double lon1, double lat2, double lon2) { + const double earth_radius_m = 6378137.0; + const double dlat = degToRad(lat2 - lat1); + const double dlon = degToRad(lon2 - lon1); + const double a = std::sin(dlat / 2.0) * std::sin(dlat / 2.0) + + std::cos(degToRad(lat1)) * std::cos(degToRad(lat2)) * std::sin(dlon / 2.0) * std::sin(dlon / 2.0); + const double c = 2.0 * std::atan2(std::sqrt(a), std::sqrt(1.0 - a)); + return earth_radius_m * c; +} + +std::string resolveTrajectoryPath(const std::string& configured_path) { + namespace fs = std::filesystem; + if (!configured_path.empty() && fs::exists(configured_path)) { + return configured_path; + } + + std::vector candidates; + candidates.emplace_back(fs::current_path() / "conf" / "conf" / kGeoJsonName); + candidates.emplace_back(fs::current_path() / ".." / "conf" / "conf" / kGeoJsonName); + + if (const char* home = std::getenv("HOME")) { + candidates.emplace_back(fs::path(home) / "atos_ws" / "src" / "atos" / "conf" / "conf" / kGeoJsonName); + candidates.emplace_back(fs::path(home) / "Documents" / "repos" / "ATOS" / "conf" / "conf" / kGeoJsonName); + } + + try { + const auto prefix = fs::path(ament_index_cpp::get_package_prefix("atos")); + candidates.emplace_back(prefix / "etc" / "conf" / kGeoJsonName); + } catch (...) {} + + for (const auto& candidate : candidates) { + if (fs::exists(candidate)) { + return candidate.string(); + } + } + return configured_path; +} +} // namespace + +AtosTruckSimulator::AtosTruckSimulator() : + Node("atos_truck_simulator") { + declare_parameter("uid", uid_); + declare_parameter("tcp_host", tcp_host_); + declare_parameter("tcp_port", tcp_port_); + declare_parameter("trajectory_geojson_path", trajectory_geojson_path_); + declare_parameter("start_index", start_index_); + declare_parameter("initial_speed_kmh", initial_speed_kmh_); + declare_parameter("target_speed_kmh", target_speed_kmh_); + declare_parameter("acceleration_mps2", acceleration_mps2_); + declare_parameter("publish_hz", publish_hz_); + declare_parameter("loop_path", loop_path_); + declare_parameter("ignore_warning_speed_commands", ignore_warning_speed_commands_); + + uid_ = get_parameter("uid").as_string(); + tcp_host_ = get_parameter("tcp_host").as_string(); + tcp_port_ = get_parameter("tcp_port").as_int(); + trajectory_geojson_path_ = get_parameter("trajectory_geojson_path").as_string(); + trajectory_geojson_path_ = resolveTrajectoryPath(trajectory_geojson_path_); + trajectory_path_name_ = std::filesystem::path(trajectory_geojson_path_).filename().string(); + start_index_ = get_parameter("start_index").as_int(); + initial_speed_kmh_ = get_parameter("initial_speed_kmh").as_double(); + target_speed_kmh_ = get_parameter("target_speed_kmh").as_double(); + acceleration_mps2_ = std::max(0.01, get_parameter("acceleration_mps2").as_double()); + publish_hz_ = std::max(1.0, get_parameter("publish_hz").as_double()); + loop_path_ = get_parameter("loop_path").as_bool(); + ignore_warning_speed_commands_ = get_parameter("ignore_warning_speed_commands").as_bool(); + + if (!loadTrajectoryPath()) { + RCLCPP_ERROR(get_logger(), "Failed to load trajectory at '%s'", trajectory_geojson_path_.c_str()); + return; + } + + if (start_index_ < 0) { + start_index_ = 0; + } + if (static_cast(start_index_) >= trajectory_path_.size()) { + start_index_ = static_cast(trajectory_path_.size() - 1); + } + + current_distance_m_ = trajectory_path_[static_cast(start_index_)].distance_m; + current_speed_mps_ = std::max(0.0, initial_speed_kmh_ / 3.6); + last_step_time_ = now(); + + speed_command_sub_ = create_subscription( + "truck_objects/speed_command", 20, std::bind(&AtosTruckSimulator::onSpeedCommand, this, std::placeholders::_1)); + + const auto period_ms = static_cast(1000.0 / publish_hz_); + simulation_timer_ = + create_wall_timer(std::chrono::milliseconds(period_ms), std::bind(&AtosTruckSimulator::simulationStep, this)); + + RCLCPP_INFO(get_logger(), + "AtosTruckSimulator started (uid=%s, path=%s, start_index=%d, target=%.1f km/h, accel=%.2f m/s^2, " + "tcp=%s:%d, ignore_warning=%s)", + uid_.c_str(), + trajectory_path_name_.c_str(), + start_index_, + target_speed_kmh_, + acceleration_mps2_, + tcp_host_.c_str(), + tcp_port_, + ignore_warning_speed_commands_ ? "true" : "false"); +} + +bool AtosTruckSimulator::loadTrajectoryPath() { + trajectory_path_.clear(); + + std::ifstream input(trajectory_geojson_path_); + if (!input.is_open()) { + return false; + } + + json root; + try { + input >> root; + } catch (...) { + return false; + } + + if (!root.contains("features") || !root["features"].is_array()) { + return false; + } + + json line_feature; + for (const auto& feature : root["features"]) { + if (!feature.contains("geometry")) { + continue; + } + const auto& geometry = feature["geometry"]; + if (!geometry.contains("type") || geometry["type"] != "LineString") { + continue; + } + if (feature.contains("id") && feature["id"] == "PathSection") { + line_feature = feature; + break; + } + if (line_feature.is_null()) { + line_feature = feature; + } + } + + if (line_feature.is_null()) { + return false; + } + + const auto& coords = line_feature["geometry"]["coordinates"]; + if (!coords.is_array() || coords.empty()) { + return false; + } + + double cumulative = 0.0; + GeoPoint prev; + bool has_prev = false; + + for (const auto& coord : coords) { + if (!coord.is_array() || coord.size() < 2) { + continue; + } + GeoPoint p; + p.lon = coord[0].get(); + p.lat = coord[1].get(); + p.distance_m = cumulative; + + if (has_prev) { + cumulative += geodesicDistanceMeters(prev.lat, prev.lon, p.lat, p.lon); + p.distance_m = cumulative; + } + + trajectory_path_.push_back(p); + prev = p; + has_prev = true; + } + + return trajectory_path_.size() >= 2; +} + +bool AtosTruckSimulator::ensureTcpConnected() { + if (tcp_fd_ >= 0) { + return true; + } + + tcp_fd_ = ::socket(AF_INET, SOCK_STREAM, 0); + if (tcp_fd_ < 0) { + return false; + } + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(static_cast(tcp_port_)); + if (::inet_pton(AF_INET, tcp_host_.c_str(), &addr.sin_addr) != 1) { + closeTcp(); + return false; + } + + if (::connect(tcp_fd_, reinterpret_cast(&addr), sizeof(addr)) < 0) { + closeTcp(); + return false; + } + + const int flags = fcntl(tcp_fd_, F_GETFL, 0); + if (flags >= 0) { + (void)fcntl(tcp_fd_, F_SETFL, flags | O_NONBLOCK); + } + + return true; +} + +void AtosTruckSimulator::closeTcp() { + if (tcp_fd_ >= 0) { + ::shutdown(tcp_fd_, SHUT_RDWR); + ::close(tcp_fd_); + tcp_fd_ = -1; + tcp_rx_buffer_.clear(); + } +} + +std::string AtosTruckSimulator::utcIso8601FromRosTime(const rclcpp::Time& time) { + const auto seconds = static_cast(time.seconds()); + std::tm tm_utc{}; + gmtime_r(&seconds, &tm_utc); + std::ostringstream out; + out << std::put_time(&tm_utc, "%Y-%m-%dT%H:%M:%SZ"); + return out.str(); +} + +bool AtosTruckSimulator::pointAtDistance(double distance_m, + double& lat, + double& lon, + double& course_deg, + int& path_index) const { + if (trajectory_path_.size() < 2) { + return false; + } + + const double max_distance = trajectory_path_.back().distance_m; + double d = distance_m; + if (loop_path_ && max_distance > kEpsilon) { + d = std::fmod(distance_m, max_distance); + if (d < 0.0) { + d += max_distance; + } + } else { + d = std::clamp(d, 0.0, max_distance); + } + + size_t segment_index = 1; + while (segment_index < trajectory_path_.size() && trajectory_path_[segment_index].distance_m < d) { + ++segment_index; + } + if (segment_index >= trajectory_path_.size()) { + segment_index = trajectory_path_.size() - 1; + } + if (segment_index == 0) { + segment_index = 1; + } + + const auto& a = trajectory_path_[segment_index - 1]; + const auto& b = trajectory_path_[segment_index]; + const double segment_length = std::max(kEpsilon, b.distance_m - a.distance_m); + const double t = std::clamp((d - a.distance_m) / segment_length, 0.0, 1.0); + + lat = a.lat + (b.lat - a.lat) * t; + lon = a.lon + (b.lon - a.lon) * t; + + const double y = std::sin(degToRad(b.lon - a.lon)) * std::cos(degToRad(b.lat)); + const double x = std::cos(degToRad(a.lat)) * std::sin(degToRad(b.lat)) - + std::sin(degToRad(a.lat)) * std::cos(degToRad(b.lat)) * std::cos(degToRad(b.lon - a.lon)); + course_deg = std::fmod(radToDeg(std::atan2(y, x)) + 360.0, 360.0); + + path_index = static_cast(t < 0.5 ? (segment_index - 1) : segment_index); + return true; +} + +void AtosTruckSimulator::simulationStep() { + const auto now_time = now(); + const double dt = std::max(0.001, (now_time - last_step_time_).seconds()); + last_step_time_ = now_time; + + const double target_speed_mps = std::max(0.0, target_speed_kmh_ / 3.6); + const double max_delta = acceleration_mps2_ * dt; + const double delta = std::clamp(target_speed_mps - current_speed_mps_, -max_delta, max_delta); + current_speed_mps_ += delta; + + current_distance_m_ += current_speed_mps_ * dt; + if (!loop_path_) { + current_distance_m_ = std::clamp(current_distance_m_, 0.0, trajectory_path_.back().distance_m); + } + + double lat = 0.0; + double lon = 0.0; + double course_deg = 0.0; + int path_index = 0; + if (!pointAtDistance(current_distance_m_, lat, lon, course_deg, path_index)) { + return; + } + + if (!ensureTcpConnected()) { + RCLCPP_WARN_THROTTLE(get_logger(), + *get_clock(), + 2000, + "Unable to connect to TruckObjectControl TCP %s:%d", + tcp_host_.c_str(), + tcp_port_); + return; + } + pollTcpCommands(); + + const std::string time_str = utcIso8601FromRosTime(now_time); + const std::string stale_str = utcIso8601FromRosTime(now_time + rclcpp::Duration::from_seconds(10.0)); + + std::ostringstream cot; + cot << "<__group name=\"Dark Green\" role=\"Test Vehicle\"/>"; + + const std::string payload = cot.str(); + const ssize_t sent = ::send(tcp_fd_, payload.data(), payload.size(), MSG_NOSIGNAL); + if (sent < 0) { + RCLCPP_WARN(get_logger(), "TCP send failed, reconnecting..."); + closeTcp(); + return; + } + + RCLCPP_INFO_THROTTLE(get_logger(), + *get_clock(), + 1000, + "Sim uid=%s path=%s idx=%d distance=%.1f m speed=%.1f km/h target=%.1f km/h lat=%.7f lon=%.7f", + uid_.c_str(), + trajectory_path_name_.c_str(), + path_index, + current_distance_m_, + current_speed_mps_ * 3.6, + target_speed_kmh_, + lat, + lon); +} + +void AtosTruckSimulator::onSpeedCommand(const std_msgs::msg::String::SharedPtr msg) { + applySpeedCommandPayload(msg->data); +} + +void AtosTruckSimulator::applySpeedCommandPayload(const std::string& payload) { + auto parseField = [&](const std::string& key) -> std::optional { + const auto pos = payload.find(key); + if (pos == std::string::npos) { + return std::nullopt; + } + const auto value_start = pos + key.size(); + const auto value_end = payload.find(';', value_start); + const auto value = payload.substr(value_start, value_end - value_start); + try { + return std::stod(value); + } catch (...) { + return std::nullopt; + } + }; + + double commanded_speed_kmh = 0.0; + if (const auto speed_mps = parseField("target_speed_mps="); speed_mps.has_value()) { + commanded_speed_kmh = std::max(0.0, *speed_mps) * 3.6; + } else if (const auto speed_kmh = parseField("target_speed_kmh="); speed_kmh.has_value()) { + commanded_speed_kmh = std::max(0.0, *speed_kmh); + } else { + return; + } + + if (ignore_warning_speed_commands_ && commanded_speed_kmh > 0.0) { + return; + } + target_speed_kmh_ = commanded_speed_kmh; +} + +void AtosTruckSimulator::pollTcpCommands() { + if (tcp_fd_ < 0) { + return; + } + + char rx[1024]; + while (true) { + const ssize_t n = ::recv(tcp_fd_, rx, sizeof(rx), 0); + if (n == 0) { + closeTcp(); + return; + } + if (n < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + break; + } + closeTcp(); + return; + } + + tcp_rx_buffer_.append(rx, static_cast(n)); + while (true) { + const auto nl = tcp_rx_buffer_.find('\n'); + if (nl == std::string::npos) { + if (tcp_rx_buffer_.size() > 4096) { + tcp_rx_buffer_.clear(); + } + break; + } + const std::string line = tcp_rx_buffer_.substr(0, nl); + tcp_rx_buffer_.erase(0, nl + 1); + if (!line.empty()) { + applySpeedCommandPayload(line); + } + } + } +} diff --git a/atos/modules/TruckObjectControl/src/main.cpp b/atos/modules/TruckObjectControl/src/main.cpp new file mode 100644 index 000000000..9b7e0c19b --- /dev/null +++ b/atos/modules/TruckObjectControl/src/main.cpp @@ -0,0 +1,16 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#include "truckobjectcontrol.hpp" +#include + +int main(int argc, char** argv) { + std::signal(SIGPIPE, SIG_IGN); + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/atos/modules/TruckObjectControl/src/main_simulator.cpp b/atos/modules/TruckObjectControl/src/main_simulator.cpp new file mode 100644 index 000000000..7e2026fb7 --- /dev/null +++ b/atos/modules/TruckObjectControl/src/main_simulator.cpp @@ -0,0 +1,14 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#include "atostrucksimulator.hpp" + +int main(int argc, char** argv) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/atos/modules/TruckObjectControl/src/truckobjectcontrol.cpp b/atos/modules/TruckObjectControl/src/truckobjectcontrol.cpp new file mode 100644 index 000000000..dec242a12 --- /dev/null +++ b/atos/modules/TruckObjectControl/src/truckobjectcontrol.cpp @@ -0,0 +1,1496 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#include "truckobjectcontrol.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using std::placeholders::_1; +using json = nlohmann::json; + +namespace { +constexpr size_t kReceiveBufferSize = 4096; +constexpr int kAcceptPollSleepMs = 100; +constexpr int kTlsHandshakeRetrySleepMs = 10; +constexpr int kTlsReadPollTimeoutMs = 1000; +constexpr int kTlsIoRetrySleepMs = 10; +constexpr int kTlsWriteMaxTransientRetries = 100; +constexpr const char* kDefaultGeoJsonName = "RuralRoad_center_of_driving_lane_ccw.geojson"; + +double degToRad(double value) { + return value * M_PI / 180.0; +} + +double geodesicDistanceMeters(double lat1, double lon1, double lat2, double lon2) { + const double earth_radius_m = 6378137.0; + const double dlat = degToRad(lat2 - lat1); + const double dlon = degToRad(lon2 - lon1); + const double a = std::sin(dlat / 2.0) * std::sin(dlat / 2.0) + + std::cos(degToRad(lat1)) * std::cos(degToRad(lat2)) * std::sin(dlon / 2.0) * std::sin(dlon / 2.0); + const double c = 2.0 * std::atan2(std::sqrt(a), std::sqrt(1.0 - a)); + return earth_radius_m * c; +} + +struct LocalXY { + double x = 0.0; + double y = 0.0; +}; + +LocalXY toLocalXY(double ref_lat_deg, double ref_lon_deg, double lat_deg, double lon_deg) { + constexpr double earth_radius_m = 6378137.0; + const double ref_lat_rad = degToRad(ref_lat_deg); + const double dlat = degToRad(lat_deg - ref_lat_deg); + const double dlon = degToRad(lon_deg - ref_lon_deg); + LocalXY out; + out.x = earth_radius_m * dlon * std::cos(ref_lat_rad); + out.y = earth_radius_m * dlat; + return out; +} + +std::string describeTlsError(const int ssl_error) { + char ssl_error_text[256] = {0}; + const unsigned long queued_error = ERR_get_error(); + if (queued_error != 0) { + ERR_error_string_n(queued_error, ssl_error_text, sizeof(ssl_error_text)); + } + + std::ostringstream out; + out << "ssl_error=" << ssl_error; + if (ssl_error_text[0] != '\0') { + out << ", openssl='" << ssl_error_text << "'"; + } + if (errno != 0) { + out << ", errno=" << errno << " (" << std::strerror(errno) << ")"; + } + return out.str(); +} + +std::string resolveDefaultTrajectoryPath(const std::string& configured_path) { + namespace fs = std::filesystem; + if (!configured_path.empty() && fs::exists(configured_path)) { + return configured_path; + } + + std::vector candidates; + candidates.emplace_back(fs::current_path() / "conf" / "conf" / kDefaultGeoJsonName); + candidates.emplace_back(fs::current_path() / ".." / "conf" / "conf" / kDefaultGeoJsonName); + + if (const char* home = std::getenv("HOME")) { + candidates.emplace_back(fs::path(home) / "atos_ws" / "src" / "atos" / "conf" / "conf" / kDefaultGeoJsonName); + candidates.emplace_back(fs::path(home) / "Documents" / "repos" / "ATOS" / "conf" / "conf" / + kDefaultGeoJsonName); + } + + try { + const auto prefix = fs::path(ament_index_cpp::get_package_prefix("atos")); + candidates.emplace_back(prefix / "etc" / "conf" / kDefaultGeoJsonName); + } catch (...) {} + + for (const auto& candidate : candidates) { + if (fs::exists(candidate)) { + return candidate.string(); + } + } + return configured_path; +} +} // namespace + +TruckObjectControl::TruckObjectControl() : + Node("truck_object_control") { + declare_parameter("warning_distance_m", warning_distance_m_); + declare_parameter("stop_distance_m", stop_distance_m_); + declare_parameter("warning_speed_kmh", warning_speed_kmh_); + declare_parameter("stop_speed_kmh", stop_speed_kmh_); + declare_parameter("cot_timeout_seconds", cot_timeout_seconds_); + declare_parameter("cot_tcp_port", cot_tcp_port_); + declare_parameter("cot_tcp_bind_address", cot_tcp_bind_address_); + declare_parameter("cot_tls_require_client_cert", cot_tls_require_client_cert_); + declare_parameter("cot_tls_cert_path", cot_tls_cert_path_); + declare_parameter("cot_tls_key_path", cot_tls_key_path_); + declare_parameter("cot_tls_ca_path", cot_tls_ca_path_); + declare_parameter("trajectory_geojson_path", trajectory_geojson_path_); + + warning_distance_m_ = get_parameter("warning_distance_m").as_double(); + stop_distance_m_ = get_parameter("stop_distance_m").as_double(); + warning_speed_kmh_ = get_parameter("warning_speed_kmh").as_double(); + stop_speed_kmh_ = get_parameter("stop_speed_kmh").as_double(); + cot_timeout_seconds_ = get_parameter("cot_timeout_seconds").as_double(); + cot_tcp_port_ = get_parameter("cot_tcp_port").as_int(); + cot_tcp_bind_address_ = get_parameter("cot_tcp_bind_address").as_string(); + cot_tls_require_client_cert_ = get_parameter("cot_tls_require_client_cert").as_bool(); + cot_tls_cert_path_ = get_parameter("cot_tls_cert_path").as_string(); + cot_tls_key_path_ = get_parameter("cot_tls_key_path").as_string(); + cot_tls_ca_path_ = get_parameter("cot_tls_ca_path").as_string(); + cot_tls_enabled_ = (!cot_tls_cert_path_.empty() && !cot_tls_key_path_.empty()); + if (!cot_tls_enabled_ && (!cot_tls_cert_path_.empty() || !cot_tls_key_path_.empty())) { + RCLCPP_WARN(get_logger(), + "Incomplete TLS config for COT listener (cert or key missing). Falling back to plain TCP."); + } + trajectory_geojson_path_ = get_parameter("trajectory_geojson_path").as_string(); + trajectory_geojson_path_ = resolveDefaultTrajectoryPath(trajectory_geojson_path_); + default_path_name_ = std::filesystem::path(trajectory_geojson_path_).filename().string(); + + cot_sub_ = create_subscription( + "truck_objects/cot", 50, std::bind(&TruckObjectControl::onCotMessage, this, _1)); + + speed_command_pub_ = create_publisher("truck_objects/speed_command", 20); + truck_state_pub_ = create_publisher("truck_objects/state", 50); + + evaluation_timer_ = create_wall_timer(std::chrono::milliseconds(200), + std::bind(&TruckObjectControl::evaluateAndPublishSpeedCommand, this)); + + if (!loadTrajectoryPath()) { + RCLCPP_WARN(get_logger(), + "Failed to load trajectory path from '%s'. Distance along trajectory will stay 0.", + trajectory_geojson_path_.c_str()); + } else { + RCLCPP_INFO(get_logger(), + "Loaded trajectory path with %zu points from %s", + trajectory_path_.size(), + trajectory_geojson_path_.c_str()); + } + + startTcpServer(); + + RCLCPP_INFO( + get_logger(), + "TruckObjectControl started. Listening for COT XML on %s://%s:%d and placeholder topic 'truck_objects/cot'.", + cot_tls_enabled_ ? "tls" : "tcp", + cot_tcp_bind_address_.c_str(), + cot_tcp_port_); +} + +TruckObjectControl::~TruckObjectControl() { + stopTcpServer(); + if (ssl_ctx_ != nullptr) { + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; + } +} + +void TruckObjectControl::publishTruckState(const std::string& truck_id, const TruckState& state) { + json payload; + payload["uid"] = truck_id; + payload["distance_m"] = state.distance_along_trajectory_m; + payload["lat"] = state.lat; + payload["lon"] = state.lon; + payload["speed_mps"] = state.speed_mps; + payload["speed_kmh"] = state.speed_mps * 3.6; + payload["course_deg"] = state.course_deg; + payload["tcp_connected"] = state.tcp_connected; + payload["path_name"] = state.path_name; + payload["path_index"] = state.path_index; + payload["last_cot_message"] = state.last_cot_message; + payload["last_tcp_command"] = state.last_tcp_command; + payload["last_tcp_warning"] = state.last_tcp_warning; + payload["stamp_sec"] = now().seconds(); + + std_msgs::msg::String msg; + msg.data = payload.dump(); + truck_state_pub_->publish(msg); +} + +void TruckObjectControl::onCotMessage(const std_msgs::msg::String::SharedPtr msg) { + CotObservation observation; + if (!parseCotPlaceholder(msg->data, observation) && !parseCotXml(msg->data, observation)) { + RCLCPP_WARN_THROTTLE(get_logger(), *get_clock(), 5000, "Failed to parse COT payload from ROS topic."); + return; + } + + TruckState state; + { + std::lock_guard lock(state_mutex_); + auto& entry = trucks_[observation.truck_id]; + entry.distance_along_trajectory_m = observation.distance_along_trajectory_m; + entry.lat = observation.lat; + entry.lon = observation.lon; + entry.speed_mps = observation.speed_mps; + entry.course_deg = observation.course_deg; + entry.tcp_connected = observation.tcp_connected; + entry.path_name = observation.path_name; + entry.path_index = observation.path_index; + entry.last_cot_message = msg->data; + entry.last_tcp_warning = ""; + entry.last_cot_stamp = now(); + state = entry; + } + + publishTruckState(observation.truck_id, state); +} + +bool TruckObjectControl::parseCotPlaceholder(const std::string& payload, CotObservation& out) const { + std::unordered_map fields; + std::stringstream ss(payload); + std::string token; + + while (std::getline(ss, token, ';')) { + const auto sep = token.find('='); + if (sep == std::string::npos) { + continue; + } + const std::string key = token.substr(0, sep); + const std::string value = token.substr(sep + 1); + fields[key] = value; + } + + if (fields.find("id") == fields.end() || fields.find("distance_m") == fields.end() || + fields.find("tcp_connected") == fields.end()) { + return false; + } + + out.truck_id = fields.at("id"); + if (out.truck_id.empty()) { + return false; + } + + try { + out.distance_along_trajectory_m = std::stod(fields.at("distance_m")); + const auto& tcp = fields.at("tcp_connected"); + out.tcp_connected = (tcp == "1" || tcp == "true" || tcp == "TRUE"); + + if (fields.find("lat") != fields.end() && fields.find("lon") != fields.end()) { + out.lat = std::stod(fields.at("lat")); + out.lon = std::stod(fields.at("lon")); + } + if (fields.find("speed_mps") != fields.end()) { + out.speed_mps = std::stod(fields.at("speed_mps")); + } else if (fields.find("speed_kmh") != fields.end()) { + out.speed_mps = std::stod(fields.at("speed_kmh")) / 3.6; + } + if (fields.find("course_deg") != fields.end()) { + out.course_deg = std::stod(fields.at("course_deg")); + } + if (fields.find("path_name") != fields.end()) { + out.path_name = fields.at("path_name"); + } + if (fields.find("path_index") != fields.end()) { + out.path_index = std::stoi(fields.at("path_index")); + } + } catch (...) { + return false; + } + + return true; +} + +bool TruckObjectControl::parseCotXml(const std::string& payload, CotObservation& out) { + static const std::regex uid_re(R"re(uid="([^"]+)")re"); + static const std::regex point_re(R"re(]*\blat="([^"]+)"[^>]*\blon="([^"]+)")re"); + static const std::regex track_re(R"re(]*\bspeed="([^"]+)"[^>]*\bcourse="([^"]+)")re"); + static const std::regex path_name_re(R"re(]*\bpath_name="([^"]+)")re"); + + std::smatch m; + if (!std::regex_search(payload, m, uid_re) || m.size() < 2) { + return false; + } + out.truck_id = m[1].str(); + if (out.truck_id.empty()) { + return false; + } + + if (!std::regex_search(payload, m, point_re) || m.size() < 3) { + return false; + } + + try { + out.lat = std::stod(m[1].str()); + out.lon = std::stod(m[2].str()); + } catch (...) { + return false; + } + + out.speed_mps = 0.0; + out.course_deg = 0.0; + if (std::regex_search(payload, m, track_re) && m.size() >= 3) { + try { + // Incoming CoT track speed from PathFollower is in km/h; convert to m/s for ATOS internals. + out.speed_mps = std::stod(m[1].str()) / 3.6; + out.course_deg = std::stod(m[2].str()); + } catch (...) { + out.speed_mps = 0.0; + out.course_deg = 0.0; + } + } + + out.path_name.clear(); + out.path_index = -1; + if (std::regex_search(payload, m, path_name_re) && m.size() >= 2) { + out.path_name = m[1].str(); + } + + const std::vector* trajectory = nullptr; + if (!out.path_name.empty()) { + trajectory = getTrajectoryForPath(out.path_name); + } + if (!trajectory) { + trajectory = &trajectory_path_; + if (out.path_name.empty()) { + out.path_name = default_path_name_; + } + } + + int projected_path_index = -1; + out.distance_along_trajectory_m = + projectDistanceAlongTrajectory(out.lat, out.lon, trajectory, &projected_path_index); + out.path_index = projected_path_index; + + out.tcp_connected = true; + return true; +} + +bool TruckObjectControl::isCotFresh(const TruckState& state, const rclcpp::Time& now_time) const { + const auto age = (now_time - state.last_cot_stamp).seconds(); + return age >= 0.0 && age <= cot_timeout_seconds_; +} + +bool TruckObjectControl::loadTrajectoryPathFromFile(const std::string& path, std::vector& out) const { + out.clear(); + + std::ifstream input(path); + if (!input.is_open()) { + return false; + } + + json root; + try { + input >> root; + } catch (...) { + return false; + } + + if (!root.contains("features") || !root["features"].is_array()) { + return false; + } + + json line_feature; + for (const auto& feature : root["features"]) { + if (!feature.contains("geometry")) { + continue; + } + const auto& geometry = feature["geometry"]; + if (!geometry.contains("type") || geometry["type"] != "LineString") { + continue; + } + if (feature.contains("id") && feature["id"] == "PathSection") { + line_feature = feature; + break; + } + if (line_feature.is_null()) { + line_feature = feature; + } + } + + if (line_feature.is_null()) { + return false; + } + + const auto& coords = line_feature["geometry"]["coordinates"]; + if (!coords.is_array() || coords.empty()) { + return false; + } + + double cumulative = 0.0; + GeoPoint prev; + bool has_prev = false; + + for (const auto& coord : coords) { + if (!coord.is_array() || coord.size() < 2) { + continue; + } + GeoPoint p; + p.lon = coord[0].get(); + p.lat = coord[1].get(); + p.distance_m = cumulative; + + if (has_prev) { + cumulative += geodesicDistanceMeters(prev.lat, prev.lon, p.lat, p.lon); + p.distance_m = cumulative; + } + + out.push_back(p); + prev = p; + has_prev = true; + } + + return out.size() >= 2; +} + +bool TruckObjectControl::loadTrajectoryPath() { + if (!loadTrajectoryPathFromFile(trajectory_geojson_path_, trajectory_path_)) { + return false; + } + + std::lock_guard lock(trajectory_cache_mutex_); + trajectory_cache_[default_path_name_] = trajectory_path_; + return true; +} + +std::string TruckObjectControl::resolveTrajectoryPathByName(const std::string& path_name) const { + namespace fs = std::filesystem; + + if (path_name.empty()) { + return trajectory_geojson_path_; + } + + const fs::path raw(path_name); + if (raw.is_absolute() && fs::exists(raw)) { + return raw.string(); + } + + const fs::path base_name = raw.filename(); + if (base_name.empty()) { + return ""; + } + + const fs::path configured = fs::path(trajectory_geojson_path_).parent_path() / base_name; + if (fs::exists(configured)) { + return configured.string(); + } + + std::vector candidates; + candidates.emplace_back(fs::current_path() / "conf" / "conf" / base_name); + candidates.emplace_back(fs::current_path() / ".." / "conf" / "conf" / base_name); + + if (const char* home = std::getenv("HOME")) { + candidates.emplace_back(fs::path(home) / "atos_ws" / "src" / "atos" / "conf" / "conf" / base_name); + candidates.emplace_back(fs::path(home) / "Documents" / "repos" / "ATOS" / "conf" / "conf" / base_name); + } + + try { + const auto prefix = fs::path(ament_index_cpp::get_package_prefix("atos")); + candidates.emplace_back(prefix / "etc" / "conf" / base_name); + } catch (...) {} + + for (const auto& candidate : candidates) { + if (fs::exists(candidate)) { + return candidate.string(); + } + } + + return ""; +} + +const std::vector* TruckObjectControl::getTrajectoryForPath( + const std::string& path_name) { + const std::string key = std::filesystem::path(path_name).filename().string(); + if (key.empty()) { + return nullptr; + } + + { + std::lock_guard lock(trajectory_cache_mutex_); + const auto it = trajectory_cache_.find(key); + if (it != trajectory_cache_.end()) { + return &it->second; + } + } + + const std::string resolved_path = resolveTrajectoryPathByName(key); + if (resolved_path.empty()) { + return nullptr; + } + + std::vector loaded; + if (!loadTrajectoryPathFromFile(resolved_path, loaded)) { + return nullptr; + } + + std::lock_guard lock(trajectory_cache_mutex_); + auto [it, inserted] = trajectory_cache_.emplace(key, std::move(loaded)); + if (inserted) { + RCLCPP_INFO(get_logger(), + "Loaded trajectory '%s' with %zu points from %s", + key.c_str(), + it->second.size(), + resolved_path.c_str()); + } + return &it->second; +} + +double TruckObjectControl::projectDistanceAlongTrajectory(double lat, + double lon, + const std::vector* trajectory, + int* projected_path_index) const { + const std::vector* path = trajectory; + if (!path || path->empty()) { + path = &trajectory_path_; + } + if (!path || path->empty()) { + if (projected_path_index != nullptr) { + *projected_path_index = -1; + } + return 0.0; + } + + // Project onto each polyline segment to get a continuous along-track distance. + // This avoids quantizing multiple trucks to the same nearest vertex. + double best_distance_to_path_m = std::numeric_limits::infinity(); + double best_distance_along_m = 0.0; + int best_index = -1; + + for (size_t i = 0; i + 1 < path->size(); ++i) { + const auto& a = (*path)[i]; + const auto& b = (*path)[i + 1]; + const double segment_length_m = std::max(1e-6, b.distance_m - a.distance_m); + + const LocalXY a_xy{0.0, 0.0}; + const LocalXY b_xy = toLocalXY(a.lat, a.lon, b.lat, b.lon); + const LocalXY q_xy = toLocalXY(a.lat, a.lon, lat, lon); + + const double vx = b_xy.x - a_xy.x; + const double vy = b_xy.y - a_xy.y; + const double wx = q_xy.x - a_xy.x; + const double wy = q_xy.y - a_xy.y; + const double denom = vx * vx + vy * vy; + double t = 0.0; + if (denom > 1e-12) { + t = std::clamp((wx * vx + wy * vy) / denom, 0.0, 1.0); + } + + const double proj_lat = a.lat + (b.lat - a.lat) * t; + const double proj_lon = a.lon + (b.lon - a.lon) * t; + const double d = geodesicDistanceMeters(lat, lon, proj_lat, proj_lon); + + if (d < best_distance_to_path_m) { + best_distance_to_path_m = d; + best_distance_along_m = a.distance_m + t * segment_length_m; + best_index = static_cast(t < 0.5 ? i : (i + 1)); + } + } + + if (projected_path_index != nullptr) { + *projected_path_index = best_index; + } + return best_distance_along_m; +} + +bool TruckObjectControl::initializeTlsContext() { + if (ssl_ctx_ != nullptr) { + return true; + } + + if (cot_tls_cert_path_.empty() || cot_tls_key_path_.empty()) { + RCLCPP_ERROR(get_logger(), "TLS enabled but cot_tls_cert_path or cot_tls_key_path is empty."); + return false; + } + + SSL_load_error_strings(); + OpenSSL_add_ssl_algorithms(); + + ssl_ctx_ = SSL_CTX_new(TLS_server_method()); + if (ssl_ctx_ == nullptr) { + RCLCPP_ERROR(get_logger(), "Failed to create TLS server context."); + return false; + } + + SSL_CTX_set_min_proto_version(ssl_ctx_, TLS1_2_VERSION); + + if (SSL_CTX_use_certificate_file(ssl_ctx_, cot_tls_cert_path_.c_str(), SSL_FILETYPE_PEM) != 1) { + RCLCPP_ERROR(get_logger(), "Failed to load TLS certificate from '%s'.", cot_tls_cert_path_.c_str()); + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; + return false; + } + + if (SSL_CTX_use_PrivateKey_file(ssl_ctx_, cot_tls_key_path_.c_str(), SSL_FILETYPE_PEM) != 1) { + RCLCPP_ERROR(get_logger(), "Failed to load TLS private key from '%s'.", cot_tls_key_path_.c_str()); + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; + return false; + } + + if (SSL_CTX_check_private_key(ssl_ctx_) != 1) { + RCLCPP_ERROR(get_logger(), "TLS private key does not match certificate."); + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; + return false; + } + + if (!cot_tls_ca_path_.empty()) { + if (SSL_CTX_load_verify_locations(ssl_ctx_, cot_tls_ca_path_.c_str(), nullptr) != 1) { + RCLCPP_ERROR(get_logger(), "Failed to load TLS CA file from '%s'.", cot_tls_ca_path_.c_str()); + SSL_CTX_free(ssl_ctx_); + ssl_ctx_ = nullptr; + return false; + } + } + + if (cot_tls_require_client_cert_) { + SSL_CTX_set_verify(ssl_ctx_, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, nullptr); + } else { + SSL_CTX_set_verify(ssl_ctx_, SSL_VERIFY_NONE, nullptr); + } + + return true; +} + +void TruckObjectControl::startTcpServer() { + if (cot_tls_enabled_ && !initializeTlsContext()) { + RCLCPP_ERROR(get_logger(), "COT listener startup failed: TLS setup failed."); + return; + } + + tcp_server_fd_ = ::socket(AF_INET, SOCK_STREAM, 0); + if (tcp_server_fd_ < 0) { + RCLCPP_ERROR(get_logger(), "Failed to create TCP socket for COT listener."); + return; + } + + int enable = 1; + (void)setsockopt(tcp_server_fd_, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + (void)setsockopt(tcp_server_fd_, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable)); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(static_cast(cot_tcp_port_)); + if (::inet_pton(AF_INET, cot_tcp_bind_address_.c_str(), &addr.sin_addr) != 1) { + RCLCPP_ERROR(get_logger(), "Invalid bind address '%s' for COT TCP listener.", cot_tcp_bind_address_.c_str()); + ::close(tcp_server_fd_); + tcp_server_fd_ = -1; + return; + } + + if (::bind(tcp_server_fd_, reinterpret_cast(&addr), sizeof(addr)) < 0) { + RCLCPP_ERROR( + get_logger(), "Failed to bind COT TCP listener on %s:%d", cot_tcp_bind_address_.c_str(), cot_tcp_port_); + ::close(tcp_server_fd_); + tcp_server_fd_ = -1; + return; + } + + if (::listen(tcp_server_fd_, 8) < 0) { + RCLCPP_ERROR(get_logger(), "Failed to listen on COT TCP listener."); + ::close(tcp_server_fd_); + tcp_server_fd_ = -1; + return; + } + + tcp_running_.store(true); + tcp_accept_thread_ = std::thread(&TruckObjectControl::acceptTcpClients, this); +} + +void TruckObjectControl::stopTcpServer() { + tcp_running_.store(false); + + if (tcp_server_fd_ >= 0) { + ::shutdown(tcp_server_fd_, SHUT_RDWR); + ::close(tcp_server_fd_); + tcp_server_fd_ = -1; + } + + if (tcp_accept_thread_.joinable()) { + tcp_accept_thread_.join(); + } + + { + std::lock_guard lock(tcp_threads_mutex_); + for (const int fd : tcp_client_fds_) { + ::shutdown(fd, SHUT_RDWR); + ::close(fd); + } + tcp_client_fds_.clear(); + } + { + std::lock_guard lock(tcp_command_mutex_); + uid_to_client_fd_.clear(); + uid_to_ssl_.clear(); + } + { + std::lock_guard lock(tcp_sessions_mutex_); + for (auto& [fd, session] : tcp_sessions_) { + { + std::lock_guard qlock(session->queue_mutex); + session->stop_sender = true; + } + session->queue_cv.notify_all(); + } + } + + std::lock_guard lock(tcp_threads_mutex_); + for (auto& thread : tcp_client_threads_) { + if (thread.joinable()) { + thread.join(); + } + } + tcp_client_threads_.clear(); + { + std::lock_guard session_lock(tcp_sessions_mutex_); + for (auto& [fd, session] : tcp_sessions_) { + if (session->sender_thread.joinable()) { + session->sender_thread.join(); + } + } + tcp_sessions_.clear(); + } +} + +void TruckObjectControl::acceptTcpClients() { + while (tcp_running_.load()) { + sockaddr_in client_addr{}; + socklen_t client_len = sizeof(client_addr); + const int client_fd = ::accept(tcp_server_fd_, reinterpret_cast(&client_addr), &client_len); + if (client_fd < 0) { + if (!tcp_running_.load()) { + break; + } + RCLCPP_WARN_THROTTLE( + get_logger(), *get_clock(), 3000, "TCP accept failed (errno=%d). Listener will continue.", errno); + std::this_thread::sleep_for(std::chrono::milliseconds(kAcceptPollSleepMs)); + continue; + } + + int keepalive = 1; + (void)setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)); + + int tcp_no_delay = 1; + if (::setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &tcp_no_delay, sizeof(tcp_no_delay)) < 0) { + RCLCPP_WARN(get_logger(), + "Failed to enable TCP_NODELAY for client socket (errno=%d). Commands may be delayed.", + errno); + } + + const int flags = ::fcntl(client_fd, F_GETFL, 0); + if (flags >= 0) { + (void)::fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); + } + + timeval recv_timeout{}; + recv_timeout.tv_sec = 1; + recv_timeout.tv_usec = 0; + (void)setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)); + + char ip_buf[INET_ADDRSTRLEN] = {0}; + const char* peer_ip = ::inet_ntop(AF_INET, &client_addr.sin_addr, ip_buf, sizeof(ip_buf)); + std::ostringstream peer; + peer << (peer_ip ? peer_ip : "unknown") << ":" << ntohs(client_addr.sin_port); + + RCLCPP_INFO(get_logger(), "Accepted TruckObject TCP client %s", peer.str().c_str()); + + { + std::lock_guard lock(tcp_sessions_mutex_); + auto session = std::make_shared(); + session->fd = client_fd; + session->peer_name = peer.str(); + tcp_sessions_[client_fd] = session; + } + + std::lock_guard lock(tcp_threads_mutex_); + tcp_client_fds_.insert(client_fd); + tcp_client_threads_.emplace_back(&TruckObjectControl::handleTcpClient, this, client_fd, peer.str()); + } +} + +void TruckObjectControl::handleTcpClient(int client_fd, const std::string& peer_name) { + std::string buffer; + buffer.reserve(8 * 1024); + std::set seen_uids; + SSL* ssl = nullptr; + std::shared_ptr session; + + { + std::lock_guard lock(tcp_sessions_mutex_); + const auto it = tcp_sessions_.find(client_fd); + if (it != tcp_sessions_.end()) { + session = it->second; + } + } + if (!session) { + RCLCPP_WARN(get_logger(), "Missing TCP client session for %s fd=%d", peer_name.c_str(), client_fd); + ::shutdown(client_fd, SHUT_RDWR); + ::close(client_fd); + return; + } + + try { + if (cot_tls_enabled_) { + ssl = SSL_new(ssl_ctx_); + if (ssl == nullptr) { + RCLCPP_WARN(get_logger(), "Failed to create TLS session for %s.", peer_name.c_str()); + throw std::runtime_error("SSL_new failed"); + } + + if (SSL_set_fd(ssl, client_fd) != 1) { + RCLCPP_WARN(get_logger(), "Failed to bind TLS session to socket for %s.", peer_name.c_str()); + throw std::runtime_error("SSL_set_fd failed"); + } + + SSL_set_mode(ssl, SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); + + while (tcp_running_.load()) { + const int accept_result = SSL_accept(ssl); + if (accept_result == 1) { + break; + } + + const int ssl_accept_error = SSL_get_error(ssl, accept_result); + if (ssl_accept_error == SSL_ERROR_WANT_READ || ssl_accept_error == SSL_ERROR_WANT_WRITE) { + std::this_thread::sleep_for(std::chrono::milliseconds(kTlsHandshakeRetrySleepMs)); + continue; + } + + char ssl_error[256] = {0}; + ERR_error_string_n(ERR_get_error(), ssl_error, sizeof(ssl_error)); + RCLCPP_WARN(get_logger(), + "TLS handshake failed for %s: %s (ssl_error=%d)", + peer_name.c_str(), + ssl_error[0] == '\0' ? "unknown error" : ssl_error, + ssl_accept_error); + throw std::runtime_error("SSL_accept failed"); + } + } + + session->ssl = ssl; + session->sender_thread = std::thread(&TruckObjectControl::clientSenderLoop, this, session); + + char receive_buffer[kReceiveBufferSize]; + while (tcp_running_.load()) { + ssize_t received = 0; + if (cot_tls_enabled_) { + pollfd pfd{}; + pfd.fd = client_fd; + pfd.events = POLLIN; + const int poll_result = ::poll(&pfd, 1, kTlsReadPollTimeoutMs); + if (poll_result == 0) { + continue; + } + if (poll_result < 0) { + if (errno == EINTR) { + continue; + } + RCLCPP_WARN(get_logger(), + "TLS receive poll error from %s (errno=%d: %s). Closing this connection only.", + peer_name.c_str(), + errno, + std::strerror(errno)); + break; + } + if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0) { + RCLCPP_INFO(get_logger(), + "TLS client %s socket closed or errored (revents=0x%x).", + peer_name.c_str(), + pfd.revents); + break; + } + + ERR_clear_error(); + errno = 0; + { + std::lock_guard lock(session->io_mutex); + received = SSL_read(ssl, receive_buffer, static_cast(sizeof(receive_buffer))); + } + if (received <= 0) { + const int ssl_read_error = SSL_get_error(ssl, static_cast(received)); + if (ssl_read_error == SSL_ERROR_WANT_READ || ssl_read_error == SSL_ERROR_WANT_WRITE) { + std::this_thread::sleep_for(std::chrono::milliseconds(kTlsIoRetrySleepMs)); + continue; + } + if (ssl_read_error == SSL_ERROR_ZERO_RETURN) { + break; + } + if (ssl_read_error == SSL_ERROR_SYSCALL && + (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) { + continue; + } + RCLCPP_WARN(get_logger(), + "TLS receive error from %s (%s). Closing this connection only.", + peer_name.c_str(), + describeTlsError(ssl_read_error).c_str()); + break; + } + } else { + received = ::recv(client_fd, receive_buffer, sizeof(receive_buffer), 0); + if (received == 0) { + break; + } + if (received < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + continue; + } + RCLCPP_WARN(get_logger(), + "TCP receive error from %s (errno=%d). Closing this connection only.", + peer_name.c_str(), + errno); + break; + } + } + + buffer.append(receive_buffer, static_cast(received)); + + while (true) { + const size_t start_pos = buffer.find(" 32 * 1024) { + buffer.clear(); + } + break; + } + const size_t end_pos = buffer.find("", start_pos); + if (end_pos == std::string::npos) { + if (start_pos > 0) { + buffer.erase(0, start_pos); + } + break; + } + + const size_t event_end = end_pos + std::string("").size(); + const std::string xml = buffer.substr(start_pos, event_end - start_pos); + buffer.erase(0, event_end); + + CotObservation observation; + if (!parseCotXml(xml, observation)) { + RCLCPP_WARN_THROTTLE(get_logger(), + *get_clock(), + 5000, + "Failed to parse COT XML from TCP client %s", + peer_name.c_str()); + continue; + } + + TruckState state; + { + std::lock_guard lock(state_mutex_); + auto& entry = trucks_[observation.truck_id]; + entry.distance_along_trajectory_m = observation.distance_along_trajectory_m; + entry.lat = observation.lat; + entry.lon = observation.lon; + entry.speed_mps = observation.speed_mps; + entry.course_deg = observation.course_deg; + entry.tcp_connected = true; + entry.path_name = observation.path_name; + entry.path_index = observation.path_index; + entry.last_cot_message = xml; + entry.last_tcp_warning = ""; + entry.last_cot_stamp = now(); + state = entry; + } + { + std::lock_guard lock(tcp_command_mutex_); + const auto existing = uid_to_client_fd_.find(observation.truck_id); + if (existing != uid_to_client_fd_.end() && existing->second != client_fd) { + RCLCPP_WARN(get_logger(), + "UID '%s' already mapped to fd=%d; remapping to fd=%d from %s. " + "Ensure each sender uses a unique uid.", + observation.truck_id.c_str(), + existing->second, + client_fd, + peer_name.c_str()); + } + uid_to_client_fd_[observation.truck_id] = client_fd; + if (cot_tls_enabled_) { + uid_to_ssl_[observation.truck_id] = ssl; + } + } + + seen_uids.insert(observation.truck_id); + publishTruckState(observation.truck_id, state); + } + } + } catch (...) { + RCLCPP_WARN( + get_logger(), "Unexpected exception while handling client %s. Connection will be closed.", peer_name.c_str()); + } + + disconnectClientSession(session, "TCP client disconnected"); +} + +void TruckObjectControl::evaluateAndPublishSpeedCommand() { + struct ConnectedTruck { + std::string id; + std::string path_name; + double distance_m = 0.0; + double speed_mps = 0.0; + int path_index = -1; + std::string previous_command; + }; + std::vector connected; + + { + std::lock_guard lock(state_mutex_); + for (const auto& [id, state] : trucks_) { + if (!state.tcp_connected) { + RCLCPP_WARN_THROTTLE(get_logger(), + *get_clock(), + 3000, + "Skipping uid=%s from command eval: tcp_connected=false, path=%s, distance=%.1fm", + id.c_str(), + state.path_name.c_str(), + state.distance_along_trajectory_m); + continue; + } + connected.push_back(ConnectedTruck{id, + state.path_name, + state.distance_along_trajectory_m, + state.speed_mps, + state.path_index, + state.last_control_command}); + } + } + + if (connected.empty()) { + RCLCPP_WARN_THROTTLE( + get_logger(), *get_clock(), 3000, "No TCP-connected trucks available for command evaluation."); + return; + } + + std::unordered_map> by_path; + for (const auto& truck : connected) { + by_path[truck.path_name].push_back(truck); + } + + { + std::ostringstream summary; + summary << "Connected trucks=" << connected.size() << " ["; + for (size_t i = 0; i < connected.size(); ++i) { + const auto& t = connected[i]; + if (i > 0) { + summary << " | "; + } + summary << "uid=" << t.id << ",path=" << t.path_name << ",d=" << std::fixed << std::setprecision(1) + << t.distance_m; + } + summary << "]"; + RCLCPP_INFO_THROTTLE(get_logger(), *get_clock(), 3000, "%s", summary.str().c_str()); + } + + double min_gap_m = std::numeric_limits::infinity(); + bool any_stop = false; + bool any_slowdown = false; + size_t sent_commands = 0; + + for (auto& [path_name, group] : by_path) { + std::sort(group.begin(), group.end(), [](const ConnectedTruck& a, const ConnectedTruck& b) { + return a.distance_m < b.distance_m; + }); + + double path_length_m = 0.0; + const std::vector* trajectory = nullptr; + if (!path_name.empty()) { + trajectory = getTrajectoryForPath(path_name); + } + if ((!trajectory || trajectory->empty()) && !trajectory_path_.empty()) { + trajectory = &trajectory_path_; + } + if (trajectory && !trajectory->empty()) { + path_length_m = trajectory->back().distance_m; + } + + for (size_t i = 0; i < group.size(); ++i) { + const bool has_peers = group.size() > 1; + bool has_ahead = has_peers; + std::string ahead_uid = "none"; + double ahead_gap = -1.0; + int ahead_path_index = -1; + + if (has_peers) { + const size_t ahead_index = (i + 1) % group.size(); + ahead_uid = group[ahead_index].id; + ahead_path_index = group[ahead_index].path_index; + const int curr_path_index = group[i].path_index; + + // Prefer index-based circular gap when both indices are valid for this trajectory. + bool used_index_gap = false; + if (trajectory && !trajectory->empty() && curr_path_index >= 0 && ahead_path_index >= 0 && + static_cast(curr_path_index) < trajectory->size() && + static_cast(ahead_path_index) < trajectory->size() && path_length_m > 0.0) { + const double curr_d = (*trajectory)[static_cast(curr_path_index)].distance_m; + const double ahead_d = (*trajectory)[static_cast(ahead_path_index)].distance_m; + ahead_gap = ahead_d - curr_d; + if (ahead_gap <= 0.0) { + // Wraparound: distance to end + distance from beginning. + ahead_gap += path_length_m; + } + used_index_gap = true; + } + + if (!used_index_gap) { + ahead_gap = group[ahead_index].distance_m - group[i].distance_m; + if (ahead_gap <= 0.0 && path_length_m > 0.0) { + ahead_gap += path_length_m; + } + } + if (ahead_gap <= 0.0) { + has_ahead = false; + ahead_uid = "none"; + ahead_path_index = -1; + ahead_gap = -1.0; + } + } + + if (has_ahead) { + min_gap_m = std::min(min_gap_m, ahead_gap); + } + + const std::string previous_command = group[i].previous_command; + std::string control_command = previous_command.empty() ? "DRIVE" : previous_command; + + const bool in_warning_band = has_ahead && (ahead_gap <= 500.0 && ahead_gap > 200.0); + const bool in_stop_band = has_ahead && (ahead_gap <= 200.0); + if (in_warning_band) { + control_command = "SLOWDOWN"; + } + if (in_stop_band) { + control_command = "STOP"; + } + if (has_ahead && ahead_gap > 300.0 && previous_command == "STOP") { + control_command = "SLOWDOWN"; + } + if (has_ahead && ahead_gap > 700.0 && previous_command == "SLOWDOWN") { + control_command = "DRIVE"; + } + + std::string target_speed_mps_value = std::to_string(std::max(0.0, group[i].speed_mps)); + if (control_command == "STOP") { + target_speed_mps_value = std::to_string(stop_speed_kmh_ / 3.6); + any_stop = true; + } else if (control_command == "SLOWDOWN") { + target_speed_mps_value = std::to_string(warning_speed_kmh_ / 3.6); + any_slowdown = true; + } + + { + std::lock_guard lock(state_mutex_); + auto it = trucks_.find(group[i].id); + if (it != trucks_.end()) { + it->second.last_control_command = control_command; + } + } + + const std::string reason = + (control_command == "STOP") + ? "truck_ahead_below_stop_distance" + : (control_command == "SLOWDOWN" ? "truck_ahead_slowdown_state" : "truck_ahead_drive_state"); + uint64_t cmd_seq = 0; + { + std::lock_guard lock(state_mutex_); + cmd_seq = ++tcp_command_seq_; + } + const std::string tcp_command = + "command=" + control_command + ";target_speed_mps=" + target_speed_mps_value + + ";distance_to_truck_ahead_m=" + std::to_string(ahead_gap) + + ";truck_ahead_path_index=" + std::to_string(ahead_path_index) + ";truck_ahead_uid=" + ahead_uid + + ";reason=" + reason + ";min_gap_m=" + (std::isfinite(min_gap_m) ? std::to_string(min_gap_m) : "-1") + + ";connected_count=" + std::to_string(connected.size()) + ";path_name=" + path_name + + ";cmd_seq=" + std::to_string(cmd_seq); + + RCLCPP_INFO(get_logger(), "Prepared TCP command for uid=%s: %s", group[i].id.c_str(), tcp_command.c_str()); + + // Keep UI state in sync with latest generated command, even if TCP/TLS send fails. + TruckState state_copy; + bool has_state = false; + { + std::lock_guard lock(state_mutex_); + auto it = trucks_.find(group[i].id); + if (it != trucks_.end()) { + it->second.last_tcp_command = tcp_command; + state_copy = it->second; + has_state = true; + } + } + if (has_state) { + publishTruckState(group[i].id, state_copy); + } + + sendSpeedCommandToTcpClient(group[i].id, tcp_command); + sent_commands += 1; + } + } + + if (!std::isfinite(min_gap_m)) { + min_gap_m = -1.0; + } + + std::string fleet_target_speed_mps_value = "nochange"; + std::string fleet_reason = "no_limit"; + if (any_stop) { + fleet_target_speed_mps_value = std::to_string(stop_speed_kmh_ / 3.6); + fleet_reason = "min_gap_below_stop_distance"; + } else if (any_slowdown) { + fleet_target_speed_mps_value = std::to_string(warning_speed_kmh_ / 3.6); + fleet_reason = "min_gap_below_warning_distance"; + } + + std_msgs::msg::String command; + command.data = "target_speed_mps=" + fleet_target_speed_mps_value + ";scope=all_connected_with_valid_tcp" + + ";reason=" + fleet_reason + ";min_gap_m=" + std::to_string(min_gap_m) + + ";connected_count=" + std::to_string(connected.size()); + speed_command_pub_->publish(command); + + RCLCPP_WARN( + get_logger(), + "Published fleet speed command target_speed_mps=%s (reason=%s, min_gap=%.2f m, connected=%zu, tcp_cmds=%zu)", + fleet_target_speed_mps_value.c_str(), + fleet_reason.c_str(), + min_gap_m, + connected.size(), + sent_commands); +} + +void TruckObjectControl::updateTruckTcpStatus(const std::string& target_id, + const std::string& command, + const std::string& warning, + bool mark_disconnected) { + TruckState state_copy; + bool has_state = false; + { + std::lock_guard lock(state_mutex_); + const auto it = trucks_.find(target_id); + if (it != trucks_.end()) { + it->second.last_tcp_command = command; + it->second.last_tcp_warning = warning; + if (mark_disconnected) { + it->second.tcp_connected = false; + it->second.last_cot_stamp = now(); + } + state_copy = it->second; + has_state = true; + } + } + if (has_state) { + publishTruckState(target_id, state_copy); + } +} + +void TruckObjectControl::clientSenderLoop(const std::shared_ptr& session) { + while (tcp_running_.load()) { + std::string target_id; + std::string command; + { + std::unique_lock lock(session->queue_mutex); + session->queue_cv.wait(lock, [&]() { return session->stop_sender || session->has_queued_command; }); + if (session->stop_sender) { + break; + } + target_id = session->queued_uid; + command = session->queued_command; + session->has_queued_command = false; + } + + if (target_id.empty() || command.empty()) { + continue; + } + + const std::string payload = command + "\n"; + if (cot_tls_enabled_) { + if (session->ssl == nullptr) { + updateTruckTcpStatus(target_id, command, "TLS session missing for this truck", true); + ::shutdown(session->fd, SHUT_RDWR); + break; + } + + size_t total_sent = 0; + int transient_retries = 0; + bool send_failed = false; + std::string send_warning; + while (total_sent < payload.size() && tcp_running_.load()) { + { + std::lock_guard lock(session->queue_mutex); + if (session->stop_sender) { + send_failed = true; + send_warning = "TLS sender stopped"; + break; + } + } + + int sent = 0; + ERR_clear_error(); + errno = 0; + { + std::lock_guard lock(session->io_mutex); + sent = SSL_write( + session->ssl, payload.data() + total_sent, static_cast(payload.size() - total_sent)); + } + + if (sent > 0) { + total_sent += static_cast(sent); + transient_retries = 0; + continue; + } + + const int ssl_write_error = SSL_get_error(session->ssl, sent); + const bool transient = + ssl_write_error == SSL_ERROR_WANT_READ || ssl_write_error == SSL_ERROR_WANT_WRITE || + (ssl_write_error == SSL_ERROR_SYSCALL && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)); + if (transient && transient_retries < kTlsWriteMaxTransientRetries) { + transient_retries += 1; + std::this_thread::sleep_for(std::chrono::milliseconds(kTlsIoRetrySleepMs)); + continue; + } + + send_failed = true; + send_warning = "TLS send failed (" + describeTlsError(ssl_write_error) + ")"; + break; + } + + if (!send_failed && total_sent < payload.size()) { + updateTruckTcpStatus(target_id, command, "TLS sender stopped before payload was fully sent", true); + ::shutdown(session->fd, SHUT_RDWR); + break; + } + + if (send_failed) { + updateTruckTcpStatus(target_id, command, send_warning, true); + ::shutdown(session->fd, SHUT_RDWR); + break; + } + + RCLCPP_INFO(get_logger(), + "Sent TLS speed command to uid=%s fd=%d: %s", + target_id.c_str(), + session->fd, + command.c_str()); + updateTruckTcpStatus(target_id, command, ""); + continue; + } + + const ssize_t sent = ::send(session->fd, payload.data(), payload.size(), MSG_NOSIGNAL | MSG_DONTWAIT); + if (sent < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + updateTruckTcpStatus(target_id, command, "TCP backpressure drop"); + continue; + } + updateTruckTcpStatus(target_id, command, "TCP send failed (errno=" + std::to_string(errno) + ")", true); + ::shutdown(session->fd, SHUT_RDWR); + break; + } + + if (static_cast(sent) < payload.size()) { + updateTruckTcpStatus(target_id, command, "TCP partial send drop"); + continue; + } + + RCLCPP_INFO( + get_logger(), "Sent TCP speed command to uid=%s fd=%d: %s", target_id.c_str(), session->fd, command.c_str()); + updateTruckTcpStatus(target_id, command, ""); + } +} + +void TruckObjectControl::disconnectClientSession(const std::shared_ptr& session, + const std::string& reason) { + if (!session) { + return; + } + + { + std::lock_guard lock(session->queue_mutex); + session->stop_sender = true; + session->has_queued_command = false; + } + session->queue_cv.notify_all(); + + std::vector affected_uids; + { + std::lock_guard lock(tcp_command_mutex_); + for (auto it = uid_to_client_fd_.begin(); it != uid_to_client_fd_.end();) { + if (it->second == session->fd) { + affected_uids.push_back(it->first); + uid_to_ssl_.erase(it->first); + it = uid_to_client_fd_.erase(it); + } else { + ++it; + } + } + } + + for (const auto& uid : affected_uids) { + updateTruckTcpStatus(uid, "", reason, true); + } + + const int old_fd = session->fd; + { + std::lock_guard lock(tcp_threads_mutex_); + tcp_client_fds_.erase(old_fd); + } + + if (session->sender_thread.joinable()) { + session->sender_thread.join(); + } + + if (session->ssl != nullptr) { + std::lock_guard lock(session->io_mutex); + (void)SSL_shutdown(session->ssl); + SSL_free(session->ssl); + session->ssl = nullptr; + } + + if (old_fd >= 0) { + ::shutdown(old_fd, SHUT_RDWR); + ::close(old_fd); + session->fd = -1; + } + + { + std::lock_guard lock(tcp_sessions_mutex_); + tcp_sessions_.erase(old_fd); + } + + RCLCPP_INFO( + get_logger(), "TruckObject TCP client disconnected: %s (%s)", session->peer_name.c_str(), reason.c_str()); +} + +void TruckObjectControl::sendSpeedCommandToTcpClient(const std::string& target_id, const std::string& command) { + int target_fd = -1; + { + std::lock_guard lock(tcp_command_mutex_); + const auto it = uid_to_client_fd_.find(target_id); + if (it != uid_to_client_fd_.end()) { + target_fd = it->second; + } + } + + if (target_fd < 0) { + updateTruckTcpStatus(target_id, command, "No active TCP socket for this truck"); + return; + } + + std::shared_ptr session; + { + std::lock_guard lock(tcp_sessions_mutex_); + const auto it = tcp_sessions_.find(target_fd); + if (it != tcp_sessions_.end()) { + session = it->second; + } + } + + if (!session) { + updateTruckTcpStatus(target_id, command, "No active TCP session for this truck"); + return; + } + + { + std::lock_guard lock(session->queue_mutex); + session->queued_uid = target_id; + session->queued_command = command; + session->has_queued_command = true; + } + session->queue_cv.notify_one(); +} diff --git a/scripts/installation/dependencies.txt b/scripts/installation/dependencies.txt index e7c28e487..15765b353 100644 --- a/scripts/installation/dependencies.txt +++ b/scripts/installation/dependencies.txt @@ -13,4 +13,5 @@ python3-pip proj-bin clang - libstdc++-12-dev \ No newline at end of file + libstdc++-12-dev + libssl-dev diff --git a/scripts/integration_testing/run_scenario_test.py b/scripts/integration_testing/run_scenario_test.py index 74576d579..38454ee73 100644 --- a/scripts/integration_testing/run_scenario_test.py +++ b/scripts/integration_testing/run_scenario_test.py @@ -30,7 +30,13 @@ def integration_test_proc(): """ # Launch a process to test return launch.actions.ExecuteProcess( - cmd=["ros2", "launch", "atos", "launch_integration_testing.py"], + cmd=[ + "ros2", + "launch", + "atos", + "launch_integration_testing.py", + "foxbridge:=False", + ], shell=True, cached_output=True, output="screen", @@ -76,5 +82,5 @@ def validate_scenario_execution(output): ), "Trajectory check test failed" process_tools.assert_output_sync( - launch_context, integration_test_proc, validate_scenario_execution, timeout=30 + launch_context, integration_test_proc, validate_scenario_execution, timeout=60 )