diff --git a/dimos/hardware/sensors/lidar/fastlio2/recorder.py b/dimos/hardware/sensors/lidar/fastlio2/recorder.py index 08784cfe89..5f081d13e9 100644 --- a/dimos/hardware/sensors/lidar/fastlio2/recorder.py +++ b/dimos/hardware/sensors/lidar/fastlio2/recorder.py @@ -45,13 +45,13 @@ class FastLio2Recorder(Recorder): _last_odom_pose: Pose | None = None @pose_setter_for("fastlio_odometry") - def _odom_pose(self, msg: Odometry) -> Pose | None: + async def _odom_pose(self, msg: Odometry) -> Pose | None: pose = getattr(msg, "pose", None) self._last_odom_pose = getattr(pose, "pose", None) if pose is not None else None return self._last_odom_pose @pose_setter_for("fastlio_lidar") - def _lidar_pose(self, msg: PointCloud2) -> Pose | None: + async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: # Most-recent odometry pose, stamped directly (no tf). None before the # first odometry -> frame stored unposed, map-skipped. return self._last_odom_pose diff --git a/dimos/hardware/sensors/lidar/pointlio/config/default.yaml b/dimos/hardware/sensors/lidar/pointlio/config/default.yaml deleted file mode 100644 index 1fd09ec9fa..0000000000 --- a/dimos/hardware/sensors/lidar/pointlio/config/default.yaml +++ /dev/null @@ -1,69 +0,0 @@ -# Point-LIO config. Mid-360-specific values to retune for a different sensor: -# preprocess.lidar_type (Livox), blind/scan_line, mapping.extrinsic_T/R (Mid-360 -# IMU->lidar mount), det_range, fov_degree. -common: - con_frame: false - con_frame_num: 1 - cut_frame: false - cut_frame_time_interval: 0.1 - time_lag_imu_to_lidar: 0.0 - -preprocess: - # LID_TYPE enum (Point-LIO src/preprocess.h): - # 1 = AVIA (Livox), 2 = VELO16, 3 = OUST64, 4 = HESAIxt32, 5 = UNILIDAR - # 1 selects the Livox branch (preprocess.cpp avia_handler), which expects the - # Livox CustomMsg point layout the Mid-360 emits: - # https://github.com/Livox-SDK/livox_ros_driver2/blob/master/msg/CustomMsg.msg - lidar_type: 1 - scan_line: 4 - scan_rate: 10 - timestamp_unit: 3 # 3 = nanosecond - blind: 0.5 - # Pre-KF input decimation: keep every Nth raw point. 1 = keep all (disable). - point_filter_num: 3 - -mapping: - use_imu_as_input: false # false = IMU-as-output model (Point-LIO's robust path) - prop_at_freq_of_imu: true - check_satu: true - init_map_size: 10 - # Pre-KF voxel downsample of each scan before the filter. false = feed the - # full scan (disable). Leaf size is filter_size_surf below. - space_down_sample: true - satu_acc: 3.0 # g; accel >= this is treated as saturated (residual zeroed) to bound velocity - satu_gyro: 35.0 - acc_norm: 1.0 # IMU accel unit: g - plane_thr: 0.1 - filter_size_surf: 0.2 # pre-KF scan downsample leaf size (m), used iff space_down_sample - filter_size_map: 0.5 - ivox_grid_resolution: 2.0 # iVox local-map grid (m) - ivox_nearby_type: 6 # NEARBY6 - cube_side_length: 1000.0 - det_range: 100.0 - fov_degree: 360.0 - imu_en: true - start_in_aggressive_motion: false - extrinsic_est_en: false - imu_time_inte: 0.005 - lidar_meas_cov: 0.01 - acc_cov_input: 0.1 - vel_cov: 20.0 - gyr_cov_input: 0.01 - gyr_cov_output: 1000.0 - acc_cov_output: 500.0 - b_gyr_cov: 0.0001 - b_acc_cov: 0.0001 - imu_meas_acc_cov: 0.01 - imu_meas_omg_cov: 0.01 - match_s: 81.0 - gravity_align: true - gravity: [0.0, 0.0, -9.810] - gravity_init: [0.0, 0.0, -9.810] - extrinsic_T: [-0.011, -0.02329, 0.04412] # Mid-360 IMU->lidar offset (m) - extrinsic_R: [1.0, 0.0, 0.0, - 0.0, 1.0, 0.0, - 0.0, 0.0, 1.0] - -odometry: - publish_odometry_without_downsample: false - odom_only: false diff --git a/dimos/hardware/sensors/lidar/pointlio/cpp/CMakeLists.txt b/dimos/hardware/sensors/lidar/pointlio/cpp/CMakeLists.txt index 27ad294a3b..9bce7246f2 100644 --- a/dimos/hardware/sensors/lidar/pointlio/cpp/CMakeLists.txt +++ b/dimos/hardware/sensors/lidar/pointlio/cpp/CMakeLists.txt @@ -53,8 +53,6 @@ find_package(Eigen3 REQUIRED) # PCL (only components we need — avoid full PCL which drags in VTK via io) find_package(PCL 1.8 REQUIRED COMPONENTS common filters) -find_package(yaml-cpp REQUIRED) - # glog (iVox map backend — include/ivox/ivox3d.h needs glog) find_package(glog REQUIRED) @@ -94,7 +92,6 @@ target_link_libraries(pointlio_native PRIVATE ${LCM_LIBRARIES} ${LIVOX_SDK} ${PCL_LIBRARIES} - yaml-cpp::yaml-cpp glog::glog ) diff --git a/dimos/hardware/sensors/lidar/pointlio/cpp/flake.lock b/dimos/hardware/sensors/lidar/pointlio/cpp/flake.lock index 531b9d7a15..3cb06c2284 100644 --- a/dimos/hardware/sensors/lidar/pointlio/cpp/flake.lock +++ b/dimos/hardware/sensors/lidar/pointlio/cpp/flake.lock @@ -37,11 +37,11 @@ "fast-lio": { "flake": false, "locked": { - "lastModified": 1781514593, - "narHash": "sha256-85zc03F2Ztw/pO1nUtVHnC+4yGzVi5z9pKk9lZcFnE8=", + "lastModified": 1781782101, + "narHash": "sha256-2phOAdagFal8BTBEKxEbl3LDSx/7SNGVTFu0zYEXB1g=", "owner": "dimensionalOS", "repo": "dimos-module-fastlio2", - "rev": "286c530db7661ca6874cd8c1381357adf83cd19f", + "rev": "288e357e5457723c1cce4d4060f76ed7f85b10d4", "type": "github" }, "original": { diff --git a/dimos/hardware/sensors/lidar/pointlio/cpp/flake.nix b/dimos/hardware/sensors/lidar/pointlio/cpp/flake.nix index 303a5aa093..0ef30ba768 100644 --- a/dimos/hardware/sensors/lidar/pointlio/cpp/flake.nix +++ b/dimos/hardware/sensors/lidar/pointlio/cpp/flake.nix @@ -89,7 +89,6 @@ pkgs.glib pkgs.eigen pkgs.pcl - pkgs.yaml-cpp pkgs.glog pkgs.boost pkgs.llvmPackages.openmp diff --git a/dimos/hardware/sensors/lidar/pointlio/cpp/main.cpp b/dimos/hardware/sensors/lidar/pointlio/cpp/main.cpp index 72e7a25094..e3d53482ef 100644 --- a/dimos/hardware/sensors/lidar/pointlio/cpp/main.cpp +++ b/dimos/hardware/sensors/lidar/pointlio/cpp/main.cpp @@ -5,13 +5,13 @@ // // Binds Livox SDK2 directly into the Point-LIO core: SDK callbacks feed // CustomMsg/Imu to the IESKF estimator, which performs LiDAR-inertial SLAM. -// Sensor-frame (mid360_link) point clouds and odometry are published on LCM. +// Sensor-frame point clouds and odometry are published on LCM. // // Usage: // ./pointlio_native \ // --lidar '/lidar#sensor_msgs.PointCloud2' \ // --odometry '/odometry#nav_msgs.Odometry' \ -// --config_path /path/to/default.yaml \ +// --filter_size_surf 0.2 --ivox_grid_resolution 2.0 ... \ # tuning as plain CLI args // --host_ip 192.168.1.5 --lidar_ip 192.168.1.155 #include @@ -57,10 +57,28 @@ static double get_publish_ts() { return std::chrono::duration( std::chrono::system_clock::now().time_since_epoch()).count(); } +// Parse a comma-separated list of doubles (CLI vector args); empty on bad input. +static std::vector parse_doubles(const std::string& csv) { + std::vector out; + size_t i = 0; + while (i < csv.size()) { + size_t j = csv.find(',', i); + if (j == std::string::npos) { j = csv.size(); } + try { + out.push_back(std::stod(csv.substr(i, j - i))); + } catch (...) { + return {}; + } + i = j + 1; + } + return out; +} + static std::string g_lidar_topic; static std::string g_odometry_topic; static std::string g_frame_id; // required via --frame_id static std::string g_child_frame_id; // required via --child_frame_id +static std::string g_sensor_frame_id; // required via --sensor_frame_id static float g_frequency = 10.0f; // Frame accumulator (Livox SDK raw → CustomMsg) @@ -85,7 +103,7 @@ static uint64_t get_timestamp_ns(const LivoxLidarEthernetPacket* pkt) { using dimos::time_from_seconds; using dimos::make_header; -// Publish the lidar point cloud in the sensor body frame (g_frame_id). +// Publish the lidar point cloud in the sensor frame (g_sensor_frame_id). // `cloud` is Point-LIO's undistorted scan in the sensor's own frame // (get_body_cloud), so points are published as-is with no world registration. static void publish_lidar(PointCloudXYZI::Ptr cloud, double timestamp, const std::string& topic = "") { @@ -95,7 +113,7 @@ static void publish_lidar(PointCloudXYZI::Ptr cloud, double timestamp, const std int num_points = static_cast(cloud->size()); sensor_msgs::PointCloud2 pc; - pc.header = make_header(g_frame_id, timestamp); + pc.header = make_header(g_sensor_frame_id, timestamp); pc.height = 1; pc.width = num_points; pc.is_bigendian = 0; @@ -321,11 +339,68 @@ int main(int argc, char** argv) { return 1; } - std::string config_path = mod.arg("config_path", ""); - if (config_path.empty()) { - fprintf(stderr, "Error: --config_path is required\n"); - return 1; - } + // Point-LIO tuning, passed as CLI args by the dimos module (no YAML). + PointLioParams params; + // common + params.con_frame = mod.arg_bool("con_frame", params.con_frame); + params.con_frame_num = mod.arg_int("con_frame_num", params.con_frame_num); + params.cut_frame = mod.arg_bool("cut_frame", params.cut_frame); + params.cut_frame_time_interval = mod.arg_float("cut_frame_time_interval", params.cut_frame_time_interval); + params.time_lag_imu_to_lidar = mod.arg_float("time_lag_imu_to_lidar", params.time_lag_imu_to_lidar); + // preprocess + params.scan_line = mod.arg_int("scan_line", params.scan_line); + params.scan_rate = mod.arg_int("scan_rate", params.scan_rate); + params.blind = mod.arg_float("blind", params.blind); + params.point_filter_num = mod.arg_int("point_filter_num", params.point_filter_num); + std::string lidar_type = mod.arg("lidar_type", "avia"); + params.lidar_type = lidar_type == "velodyne" ? 2 : lidar_type == "ouster" ? 3 : + lidar_type == "hesai" ? 4 : lidar_type == "unilidar" ? 5 : 1; + std::string ts_unit = mod.arg("timestamp_unit", "nanosecond"); + params.timestamp_unit = ts_unit == "second" ? 0 : ts_unit == "millisecond" ? 1 : + ts_unit == "microsecond" ? 2 : 3; + // mapping + params.use_imu_as_input = mod.arg_bool("use_imu_as_input", params.use_imu_as_input); + params.prop_at_freq_of_imu = mod.arg_bool("prop_at_freq_of_imu", params.prop_at_freq_of_imu); + params.check_satu = mod.arg_bool("check_satu", params.check_satu); + params.init_map_size = mod.arg_int("init_map_size", params.init_map_size); + params.space_down_sample = mod.arg_bool("space_down_sample", params.space_down_sample); + params.satu_acc = mod.arg_float("satu_acc", params.satu_acc); + params.satu_gyro = mod.arg_float("satu_gyro", params.satu_gyro); + params.acc_norm = mod.arg_float("acc_norm", params.acc_norm); + params.plane_thr = mod.arg_float("plane_thr", params.plane_thr); + params.filter_size_surf = mod.arg_float("filter_size_surf", params.filter_size_surf); + params.filter_size_map = mod.arg_float("filter_size_map", params.filter_size_map); + params.ivox_grid_resolution = mod.arg_float("ivox_grid_resolution", params.ivox_grid_resolution); + std::string ivox_nearby = mod.arg("ivox_nearby_type", "nearby6"); + params.ivox_nearby_type = ivox_nearby == "center" ? 0 : ivox_nearby == "nearby18" ? 18 : + ivox_nearby == "nearby26" ? 26 : 6; + params.cube_side_length = mod.arg_float("cube_side_length", params.cube_side_length); + params.det_range = mod.arg_float("det_range", params.det_range); + params.fov_degree = mod.arg_float("fov_degree", params.fov_degree); + params.imu_en = mod.arg_bool("imu_en", params.imu_en); + params.start_in_aggressive_motion = mod.arg_bool("start_in_aggressive_motion", params.start_in_aggressive_motion); + params.extrinsic_est_en = mod.arg_bool("extrinsic_est_en", params.extrinsic_est_en); + params.imu_time_inte = mod.arg_float("imu_time_inte", params.imu_time_inte); + params.lidar_meas_cov = mod.arg_float("lidar_meas_cov", params.lidar_meas_cov); + params.acc_cov_input = mod.arg_float("acc_cov_input", params.acc_cov_input); + params.vel_cov = mod.arg_float("vel_cov", params.vel_cov); + params.gyr_cov_input = mod.arg_float("gyr_cov_input", params.gyr_cov_input); + params.gyr_cov_output = mod.arg_float("gyr_cov_output", params.gyr_cov_output); + params.acc_cov_output = mod.arg_float("acc_cov_output", params.acc_cov_output); + params.b_gyr_cov = mod.arg_float("b_gyr_cov", params.b_gyr_cov); + params.b_acc_cov = mod.arg_float("b_acc_cov", params.b_acc_cov); + params.imu_meas_acc_cov = mod.arg_float("imu_meas_acc_cov", params.imu_meas_acc_cov); + params.imu_meas_omg_cov = mod.arg_float("imu_meas_omg_cov", params.imu_meas_omg_cov); + params.match_s = mod.arg_float("match_s", params.match_s); + params.gravity_align = mod.arg_bool("gravity_align", params.gravity_align); + if (auto g = parse_doubles(mod.arg("gravity", "")); !g.empty()) params.gravity = g; + if (auto gi = parse_doubles(mod.arg("gravity_init", "")); !gi.empty()) params.gravity_init = gi; + if (auto et = parse_doubles(mod.arg("extrinsic_t", "")); !et.empty()) params.extrinsic_T = et; + if (auto er = parse_doubles(mod.arg("extrinsic_r", "")); !er.empty()) params.extrinsic_R = er; + // odometry + params.publish_odometry_without_downsample = + mod.arg_bool("publish_odometry_without_downsample", params.publish_odometry_without_downsample); + params.odom_only = mod.arg_bool("odom_only", params.odom_only); // Point-LIO internal processing rates double msr_freq = mod.arg_float("msr_freq", 50.0f); @@ -336,7 +411,8 @@ int main(int argc, char** argv) { std::string lidar_ip = mod.arg("lidar_ip", "192.168.1.155"); g_frequency = mod.arg_float("frequency", 10.0f); g_frame_id = mod.arg_required("frame_id"); - g_child_frame_id = mod.arg_required("body_frame_id"); + g_child_frame_id = mod.arg_required("child_frame_id"); + g_sensor_frame_id = mod.arg_required("sensor_frame_id"); float pointcloud_freq = mod.arg_float("pointcloud_freq", 5.0f); float odom_freq = mod.arg_float("odom_freq", 50.0f); @@ -362,7 +438,7 @@ int main(int argc, char** argv) { printf("[pointlio] Starting Point-LIO + Livox Mid-360 native module\n"); printf("[pointlio] lidar topic: %s\n", g_lidar_topic.empty() ? "(disabled)" : g_lidar_topic.c_str()); printf("[pointlio] odometry topic: %s\n", g_odometry_topic.empty() ? "(disabled)" : g_odometry_topic.c_str()); - printf("[pointlio] config: %s\n", config_path.c_str()); + printf("[pointlio] tuning: filter_size_surf=%.3f ivox_res=%.3f lidar_type=%d\n", params.filter_size_surf, params.ivox_grid_resolution, params.lidar_type); printf("[pointlio] host_ip: %s lidar_ip: %s frequency: %.1f Hz\n", host_ip.c_str(), lidar_ip.c_str(), g_frequency); printf("[pointlio] pointcloud_freq: %.1f Hz odom_freq: %.1f Hz\n", pointcloud_freq, odom_freq); } @@ -378,7 +454,7 @@ int main(int argc, char** argv) { g_lcm = &lcm; if (debug) { printf("[pointlio] Initializing Point-LIO...\n"); } - PointLio point_lio(config_path, msr_freq, main_freq); + PointLio point_lio(params, msr_freq, main_freq); g_point_lio = &point_lio; if (debug) { printf("[pointlio] Point-LIO initialized.\n"); } diff --git a/dimos/hardware/sensors/lidar/pointlio/module.py b/dimos/hardware/sensors/lidar/pointlio/module.py index 4a13625ab8..9db92e9f6d 100644 --- a/dimos/hardware/sensors/lidar/pointlio/module.py +++ b/dimos/hardware/sensors/lidar/pointlio/module.py @@ -24,16 +24,17 @@ PointLio.blueprint(host_ip="192.168.1.5", lidar_ip="192.168.1.155"), SomeConsumer.blueprint(), )).loop() + +Point-LIO tuning lives directly on ``PointLioConfig`` and is passed to the C++ +binary as plain CLI args (no YAML). """ from __future__ import annotations import os -from pathlib import Path -from typing import TYPE_CHECKING, Annotated +from typing import TYPE_CHECKING, Literal from pydantic import Field -from pydantic.experimental.pipeline import validate_as from reactivex.disposable import Disposable from dimos.core.core import rpc @@ -57,10 +58,17 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.nav_msgs.Odometry import Odometry from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -from dimos.navigation.cmu_nav.frames import FRAME_ODOM +from dimos.navigation.cmu_nav.frames import FRAME_BODY, FRAME_ODOM from dimos.spec import perception -_CONFIG_DIR = Path(__file__).parent / "config" +# Human-readable enums; the C++ binary (main.cpp) maps these strings to +# Point-LIO's int codes. +# LID_TYPE enum (Point-LIO src/preprocess.h). avia = 1 selects the Livox branch +# the Mid-360 emits. +LidarType = Literal["avia", "velodyne", "ouster", "hesai", "unilidar"] +TimestampUnit = Literal["second", "millisecond", "microsecond", "nanosecond"] +# iVox local-map neighbour stencil. +IvoxNearbyType = Literal["center", "nearby6", "nearby18", "nearby26"] class PointLioConfig(NativeModuleConfig): @@ -73,11 +81,13 @@ class PointLioConfig(NativeModuleConfig): lidar_ip: str | None = Field(default_factory=lambda: os.environ.get("DIMOS_POINTLIO_LIDAR_IP")) frequency: float = 10.0 - # Sensor frame for the cloud + odometry headers. - frame_id: str = "mid360_link" - # Published TF: body_start_frame_id -> body_frame_id. - body_start_frame_id: str = FRAME_ODOM - body_frame_id: str = "base_link" + # Odometry is published as frame_id (fixed) -> child_frame_id (moving body), + # and also broadcast on TF. The point cloud is stamped with sensor_frame_id + # (the lidar's own frame — get_body_cloud is the undistorted scan, not yet + # transformed into the body frame). + frame_id: str = FRAME_ODOM + child_frame_id: str = FRAME_BODY + sensor_frame_id: str = "mid360_link" # Point-LIO internal processing rates (Hz) msr_freq: float = 50.0 @@ -86,14 +96,65 @@ class PointLioConfig(NativeModuleConfig): pointcloud_freq: float = 10.0 odom_freq: float = 30.0 - # Point-LIO YAML config (relative to config/ dir, or absolute path). - config: Annotated[ - Path, - validate_as(...).transform(lambda path: path if path.is_absolute() else _CONFIG_DIR / path), - ] = Path("default.yaml") - debug: bool = False + # Point-LIO tuning, passed to the binary as plain CLI args (read in main.cpp). + # common + con_frame: bool = False + con_frame_num: int = 1 + cut_frame: bool = False + cut_frame_time_interval: float = 0.1 + time_lag_imu_to_lidar: float = 0.0 + # preprocess + lidar_type: LidarType = "avia" # 1 = AVIA (Livox) branch the Mid-360 emits + scan_line: int = 4 + scan_rate: int = 10 + timestamp_unit: TimestampUnit = "nanosecond" + blind: float = 0.5 # spherical min range (m) + point_filter_num: int = 3 # pre-KF decimation: keep every Nth raw point (1 = all) + # mapping + use_imu_as_input: bool = False # false = IMU-as-output model (robust path) + prop_at_freq_of_imu: bool = True + check_satu: bool = True + init_map_size: int = 10 + space_down_sample: bool = True # pre-KF voxel downsample (leaf = filter_size_surf) + satu_acc: float = 3.0 # g; accel >= this is treated as saturated, bounding velocity + satu_gyro: float = 35.0 + acc_norm: float = 1.0 # IMU accel unit: g + plane_thr: float = 0.1 + filter_size_surf: float = 0.2 # pre-KF scan downsample leaf (m), iff space_down_sample + filter_size_map: float = 0.5 + ivox_grid_resolution: float = 2.0 # iVox local-map grid (m) + ivox_nearby_type: IvoxNearbyType = "nearby6" + cube_side_length: float = 1000.0 + det_range: float = 100.0 + fov_degree: float = 360.0 + imu_en: bool = True + start_in_aggressive_motion: bool = False + extrinsic_est_en: bool = False + imu_time_inte: float = 0.005 + lidar_meas_cov: float = 0.01 + acc_cov_input: float = 0.1 + vel_cov: float = 20.0 + gyr_cov_input: float = 0.01 + gyr_cov_output: float = 1000.0 + acc_cov_output: float = 500.0 + b_gyr_cov: float = 0.0001 + b_acc_cov: float = 0.0001 + imu_meas_acc_cov: float = 0.01 + imu_meas_omg_cov: float = 0.01 + match_s: float = 81.0 + gravity_align: bool = True + gravity: list[float] = Field(default_factory=lambda: [0.0, 0.0, -9.81]) + gravity_init: list[float] = Field(default_factory=lambda: [0.0, 0.0, -9.81]) + extrinsic_t: list[float] = Field(default_factory=lambda: [-0.011, -0.02329, 0.04412]) + extrinsic_r: list[float] = Field( + default_factory=lambda: [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + ) + # odometry + publish_odometry_without_downsample: bool = False + odom_only: bool = False + # SDK port configuration (see livox/ports.py for defaults) cmd_data_port: int = SDK_CMD_DATA_PORT push_msg_port: int = SDK_PUSH_MSG_PORT @@ -106,18 +167,6 @@ class PointLioConfig(NativeModuleConfig): host_imu_data_port: int = SDK_HOST_IMU_DATA_PORT host_log_data_port: int = SDK_HOST_LOG_DATA_PORT - # Resolved in __post_init__, passed as --config_path to the binary - config_path: str | None = None - - cli_exclude: frozenset[str] = frozenset({"config", "body_start_frame_id"}) - - def model_post_init(self, __context: object) -> None: - super().model_post_init(__context) - cfg = self.config - if not cfg.is_absolute(): - cfg = _CONFIG_DIR / cfg - self.config_path = str(cfg.resolve()) - class PointLio(NativeModule, perception.Lidar, perception.Odometry): config: PointLioConfig @@ -136,8 +185,8 @@ def start(self) -> None: def _on_odom_for_tf(self, msg: Odometry) -> None: self.tf.publish( Transform( - frame_id=self.config.body_start_frame_id, - child_frame_id=self.config.body_frame_id, + frame_id=self.frame_id, + child_frame_id=self.config.child_frame_id, translation=Vector3( msg.pose.position.x, msg.pose.position.y, diff --git a/dimos/hardware/sensors/lidar/pointlio/recorder.py b/dimos/hardware/sensors/lidar/pointlio/recorder.py index e1afbd502c..9368792f6e 100644 --- a/dimos/hardware/sensors/lidar/pointlio/recorder.py +++ b/dimos/hardware/sensors/lidar/pointlio/recorder.py @@ -14,169 +14,44 @@ """Record Point-LIO odometry + lidar into a memory2 SQLite db. -Subscribes to a PointLio's ``odometry`` / ``lidar`` outputs (auto-connected by -matching stream name + type — no remappings needed) and appends them to a -memory2 store. Timestamps are converted onto the db's existing clock so a run -can be appended to an existing db (e.g. a fastlio replay) and compared on one -timeline. Owns the db lifecycle: refuses to clobber existing streams unless -``force``, and derives the alignment reference from whatever the db already holds. - -Each lidar frame is stamped with the latest odometry pose, so ``pointlio_lidar`` -carries the trajectory and ``dimos map global`` can register it directly (it -transforms the body-frame cloud by that pose) — no ``dimos map pose-fill`` pass. +A ``Recorder`` that records its In ports under their own names +(``pointlio_odometry`` / ``pointlio_lidar``) — wire them to PointLio's +``odometry`` / ``lidar`` outputs with ``.remappings()``. Poses come straight +from the odometry stream (``@pose_setter_for``): each lidar frame is stamped with +the latest odometry pose so ``pointlio_lidar`` carries the trajectory and ``dimos +map global`` can register the body-frame cloud directly (no ``pose-fill`` pass). """ from __future__ import annotations -from collections.abc import AsyncIterator -import math -from pathlib import Path -import sqlite3 -import time - -from dimos.core.module import Module, ModuleConfig from dimos.core.stream import In +from dimos.memory2.module import OnExisting, Recorder, RecorderConfig, pose_setter_for from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.nav_msgs.Odometry import Odometry from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -# Below this the db's existing timestamps are sensor-boot seconds, not unix time. -_SENSOR_CLOCK_MAX = 1e8 -# Strictly-increasing tie-breaker so two samples never collide on ts. -_EPS = 1e-9 -# Max sensor-ts gap to attach the latest odometry pose to a lidar frame, so -# pointlio_lidar carries the trajectory and `dimos map global` can register it -# (it transforms by the per-frame pose). Matches pose_fill's nearest-match window. -_POSE_MATCH_TOL = 0.1 - - -def _existing_min_ts(db_path: Path) -> float: - """Min ts across the db's existing streams, or -1.0 if none/absent.""" - if not db_path.exists(): - return -1.0 - con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=2.0) - try: - tables = [ - row[0] - for row in con.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() - ] - best: float | None = None - for table in tables: - if table.startswith("_") or table.startswith("sqlite_"): - continue - try: - cols = [col[1] for col in con.execute(f"PRAGMA table_info('{table}')").fetchall()] - if "ts" not in cols: - continue - row = con.execute(f"SELECT MIN(ts) FROM '{table}'").fetchone() - except sqlite3.OperationalError: - continue - if row and row[0] is not None: - best = row[0] if best is None else min(best, row[0]) - return best if best is not None else -1.0 - finally: - con.close() - - -class PointlioRecorderConfig(ModuleConfig): - """Configures the recorder with the target db and timing conversion.""" - db_path: str = "" - # db stream/table names the Point-LIO outputs are recorded under. - odom_stream_name: str = "pointlio_odometry" - lidar_stream_name: str = "pointlio_lidar" - # Explicit offset override; NaN means auto-derive from the db's earliest ts. - time_offset: float = float("nan") - # Drop pre-existing odom/lidar streams instead of refusing to overwrite. - force: bool = False +class PointlioRecorderConfig(RecorderConfig): + # Append into a populated db (keep other streams); replace only our own. + on_existing: OnExisting = OnExisting.APPEND -class PointlioRecorder(Module): +class PointlioRecorder(Recorder): config: PointlioRecorderConfig - lidar: In[PointCloud2] - odometry: In[Odometry] - _offset: float | None = None - _ref_start_ts: float = -1.0 - _last_odom_ts: float = 0.0 - _last_lidar_ts: float = 0.0 - _odom_count: int = 0 - _lidar_count: int = 0 - # Latest odometry pose + its raw sensor ts, stamped onto each lidar frame so - # pointlio_lidar carries the trajectory (no separate pose-fill pass). - _last_odom_pose: Pose | None = None - _last_odom_raw_ts: float = 0.0 - - async def main(self) -> AsyncIterator[None]: - # Deferred: the store is opened in the worker process that runs main(), - # not at module-scan/import time on the host. - from dimos.memory2.store.sqlite import SqliteStore - - cfg = self.config - self._store = SqliteStore(path=cfg.db_path) - names = (cfg.odom_stream_name, cfg.lidar_stream_name) - existing = sorted(set(self._store.list_streams()) & set(names)) - if existing and not cfg.force: - raise RuntimeError( - f"PointlioRecorder: {Path(cfg.db_path).name} already has {existing}; " - "set force=True to overwrite" - ) - for name in existing: - self._store.delete_stream(name) - # Reference is the db's earliest ts *after* dropping our own old streams, - # so only the data we're aligning against (e.g. a fastlio replay) counts. - self._ref_start_ts = _existing_min_ts(Path(cfg.db_path)) - self._os = self._store.stream(cfg.odom_stream_name, Odometry) - self._ls = self._store.stream(cfg.lidar_stream_name, PointCloud2) - yield - self._store.stop() + pointlio_odometry: In[Odometry] + pointlio_lidar: In[PointCloud2] - def _resolve_offset(self, first_ts: float) -> float: - override = self.config.time_offset - if not math.isnan(override): - return override - ref = self._ref_start_ts - if ref < 0.0 or ref < _SENSOR_CLOCK_MAX: - # Empty db, or db already on the sensor clock -> exact alignment. - return 0.0 - # db on wall-clock time -> start-align Point-LIO onto the db's earliest ts. - return ref - first_ts - - def _aligned_ts(self, raw_ts: float, last_ts: float) -> float: - """Convert a sensor ts onto the db clock, kept strictly above last_ts.""" - if self._offset is None: - self._offset = self._resolve_offset(raw_ts) - return max(raw_ts + self._offset, last_ts + _EPS) + _last_odom_pose: Pose | None = None - async def handle_odometry(self, msg: Odometry) -> None: - # `is not None`, not `or`: a real sensor ts of 0.0 must not fall back to - # wall time (would misclassify the stream's clock in _resolve_offset). - raw_ts_raw = getattr(msg, "ts", None) - raw_ts = raw_ts_raw if raw_ts_raw is not None else time.time() - ts = self._aligned_ts(raw_ts, self._last_odom_ts) - self._last_odom_ts = ts + @pose_setter_for("pointlio_odometry") + async def _odom_pose(self, msg: Odometry) -> Pose | None: pose = getattr(msg, "pose", None) - pose_inner = getattr(pose, "pose", None) if pose is not None else None - self._os.append(msg, ts=ts, pose=pose_inner) - self._last_odom_pose = pose_inner - self._last_odom_raw_ts = raw_ts - self._odom_count += 1 - - async def handle_lidar(self, msg: PointCloud2) -> None: - raw_ts_raw = getattr(msg, "ts", None) - raw_ts = raw_ts_raw if raw_ts_raw is not None else time.time() - ts = self._aligned_ts(raw_ts, self._last_lidar_ts) - self._last_lidar_ts = ts - # Stamp the latest odometry pose (within tolerance) onto the frame so - # pointlio_lidar carries the trajectory; map global transforms the - # body-frame cloud by it. Both Point-LIO outputs share a publish ts, so - # the nearest odometry is at most ~one odom period stale. Frames with no - # match (e.g. before the first odometry) get None and are map-skipped. - pose = ( - self._last_odom_pose - if self._last_odom_pose is not None - and abs(raw_ts - self._last_odom_raw_ts) <= _POSE_MATCH_TOL - else None - ) - self._ls.append(msg, ts=ts, pose=pose) - self._lidar_count += 1 + self._last_odom_pose = getattr(pose, "pose", None) if pose is not None else None + return self._last_odom_pose + + @pose_setter_for("pointlio_lidar") + async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: + # Most-recent odometry pose, stamped directly (no tf). None before the + # first odometry -> frame stored unposed, map-skipped. + return self._last_odom_pose diff --git a/dimos/hardware/sensors/lidar/pointlio/scripts/pcap_to_db.py b/dimos/hardware/sensors/lidar/pointlio/scripts/pcap_to_db.py index 6584646fd6..a9030bcb8c 100644 --- a/dimos/hardware/sensors/lidar/pointlio/scripts/pcap_to_db.py +++ b/dimos/hardware/sensors/lidar/pointlio/scripts/pcap_to_db.py @@ -18,12 +18,15 @@ # LFS archive, NOT the archive name) PCAP_PATH="$(python -c "from dimos.utils.data import get_data; print(get_data('mid360_shake_stairs/mid360_shake_stairs.pcap'))")" - # gen .db from pcap with your config (defaults to .db next to the pcap) + # gen .db from pcap (defaults to .db next to the pcap) + python -m dimos.hardware.sensors.lidar.pointlio.scripts.pcap_to_db --pcap "$PCAP_PATH" + + # override any PointLioConfig field via a small YAML/JSON doc, e.g. {acc_cov_input: 0.3} python -m dimos.hardware.sensors.lidar.pointlio.scripts.pcap_to_db \ - --config dimos/hardware/sensors/lidar/pointlio/config/default.yaml \ - --pcap "$PCAP_PATH" + --pcap "$PCAP_PATH" --config overrides.yaml - # add to existing .db + # add to existing .db (a missing --db is fetched via get_data before falling + # back to building from scratch; a missing --pcap is likewise fetched) DB="mem2.db" python -m dimos.hardware.sensors.lidar.pointlio.scripts.pcap_to_db --db "$DB" --pcap "$PCAP_PATH" @@ -61,6 +64,9 @@ _STARTUP_TIMEOUT_SEC = 60.0 # Max |Δts| to match a lidar frame to an odometry pose when aggregating the .rrd. _POSE_MATCH_TOL = 0.1 +# db stream/table names (= the recorder's In-port names). +_ODOM_STREAM = "pointlio_odometry" +_LIDAR_STREAM = "pointlio_lidar" # Extra seconds past the pcap's own duration before auto-stopping, when no # explicit --max-sensor-sec is given. _DRAIN_MARGIN_SEC = 4.0 @@ -199,7 +205,9 @@ def _write_rrd(db_path: Path, odom_stream: str, lidar_stream: str, voxel: float) store.stop() -def _build_blueprint(args: argparse.Namespace, db_path: Path, config_path: str) -> Blueprint: +def _build_blueprint( + args: argparse.Namespace, db_path: Path, overrides: dict[str, Any] +) -> Blueprint: """autoconnect(VirtualMid360 + PointLio + PointlioRecorder). PointLio's ``odometry``/``lidar`` outputs auto-wire to the recorder's @@ -211,36 +219,35 @@ def _build_blueprint(args: argparse.Namespace, db_path: Path, config_path: str) from dimos.hardware.sensors.lidar.pointlio.recorder import PointlioRecorder from dimos.hardware.sensors.lidar.virtual_mid360.module import VirtualMid360 - # `config` (not `config_path`, which PointLioConfig derives itself); already - # an absolute path so it bypasses the config-dir-relative resolution. Omit - # when empty to keep the default.yaml. - pointlio_kwargs: dict[str, object] = dict( + pointlio_kwargs: dict[str, Any] = dict( host_ip=args.host_ip, lidar_ip=args.lidar_ip, odom_freq=args.odom_freq, debug=False ) - if config_path: - pointlio_kwargs["config"] = config_path - - return autoconnect( - VirtualMid360.blueprint( - pcap=str(args.pcap_path), - rate=args.rate, - delay=args.warmup_sec, # hold streaming until PointLio's SDK is up - host_ip=args.host_ip, - lidar_ip=args.lidar_ip, - alias_iface=args.alias_iface, - # When the NIC is provisioned by hand, skip the module's own sudo - # (it runs in a tty-less worker where a password prompt can't appear). - setup_network=not args.no_network_setup, - ), - PointLio.blueprint(**pointlio_kwargs), - PointlioRecorder.blueprint( - db_path=str(db_path), - odom_stream_name=args.odom_stream_name, - lidar_stream_name=args.lidar_stream_name, - time_offset=float("nan") if args.time_offset is None else args.time_offset, - force=args.force, - ), - ).global_config(n_workers=4, robot_model="mid360_pointlio_pcap_to_db") + pointlio_kwargs.update(overrides) + + return ( + autoconnect( + VirtualMid360.blueprint( + pcap=str(args.pcap_path), + rate=args.rate, + delay=args.warmup_sec, # hold streaming until PointLio's SDK is up + host_ip=args.host_ip, + lidar_ip=args.lidar_ip, + alias_iface=args.alias_iface, + # When the NIC is provisioned by hand, skip the module's own sudo + # (it runs in a tty-less worker where a password prompt can't appear). + setup_network=not args.no_network_setup, + ), + PointLio.blueprint(**pointlio_kwargs), + PointlioRecorder.blueprint(db_path=str(db_path)), + ) + .remappings( + [ + (PointlioRecorder, "pointlio_odometry", "odometry"), + (PointlioRecorder, "pointlio_lidar", "lidar"), + ] + ) + .global_config(n_workers=4, robot_model="mid360_pointlio_pcap_to_db") + ) def _poll_until_drained( @@ -291,23 +298,65 @@ def _poll_until_drained( stagnant_since = None +def _load_overrides(config: str) -> dict[str, Any]: + """Load a YAML/JSON doc of PointLioConfig field overrides, e.g. {acc_cov_input: 0.3}.""" + if not config: + return {} + import yaml + + path = Path(config).expanduser().resolve() + if not path.exists(): + raise FileNotFoundError(f"--config not found: {path}") + data = yaml.safe_load(path.read_text()) or {} + if not isinstance(data, dict): + raise ValueError(f"--config must be a mapping of PointLioConfig fields, got {type(data)}") + return data + + +def _resolve_db_path(args: argparse.Namespace, pcap_path: Path) -> Path: + """Where to record. Omitted --db -> .db. A given --db that's missing is + fetched via get_data (LFS) before falling back to building from scratch.""" + if not args.db: + return pcap_path.with_suffix(".db") + db_path = Path(args.db).expanduser().resolve() + if not db_path.exists(): + try: + from dimos.utils.data import get_data + + fetched = get_data(args.db) + if fetched.exists(): + print(f"[pcap_to_db] fetched --db via get_data: {fetched}", flush=True) + return fetched.resolve() + except (FileNotFoundError, RuntimeError, OSError) as exc: # not an LFS db -> build fresh + print( + f"[pcap_to_db] --db not found locally or via get_data ({exc}); " + "building from scratch", + file=sys.stderr, + flush=True, + ) + return db_path + + def _run(args: argparse.Namespace) -> int: from dimos.core.coordination.module_coordinator import ModuleCoordinator - pcap_path = Path(args.pcap).expanduser().resolve() + pcap_path = Path(args.pcap).expanduser() if not pcap_path.exists(): - print(f"[pcap_to_db] missing pcap: {pcap_path}", file=sys.stderr) - return 2 + try: + from dimos.utils.data import get_data + + pcap_path = get_data(args.pcap) + except (FileNotFoundError, RuntimeError, OSError) as exc: + print( + f"[pcap_to_db] pcap not found locally or via get_data: {args.pcap} ({exc})", + file=sys.stderr, + ) + return 2 + pcap_path = pcap_path.resolve() args.pcap_path = pcap_path - db_path = Path(args.db).expanduser().resolve() if args.db else pcap_path.with_suffix(".db") + db_path = _resolve_db_path(args, pcap_path) db_path.parent.mkdir(parents=True, exist_ok=True) - # Resolve --config against the *invoking* cwd (pwd-relative) up front; the - # PointLio config field otherwise resolves a relative path against its own - # config/ dir, never the pwd. Absolute path passes through unchanged. - config_path = str(Path(args.config).expanduser().resolve()) if args.config else "" - if config_path and not Path(config_path).exists(): - print(f"[pcap_to_db] missing --config: {config_path}", file=sys.stderr) - return 2 + overrides = _load_overrides(args.config) # Default the stop bound to the pcap's own duration: Point-LIO keeps # dead-reckoning (publishing at full rate) after the pcap drains, so the @@ -328,15 +377,13 @@ def _run(args: argparse.Namespace) -> int: coord = None try: - coord = ModuleCoordinator.build(_build_blueprint(args, db_path, config_path)) - drained = _poll_until_drained( - db_path, args.odom_stream_name, args.lidar_stream_name, max_sensor_sec - ) + coord = ModuleCoordinator.build(_build_blueprint(args, db_path, overrides)) + drained = _poll_until_drained(db_path, _ODOM_STREAM, _LIDAR_STREAM, max_sensor_sec) finally: if coord is not None: coord.stop() - o_cnt, o_min, o_max = _odom_stats(db_path, args.odom_stream_name) + o_cnt, o_min, o_max = _odom_stats(db_path, _ODOM_STREAM) if o_cnt == 0 or not drained: print("[pcap_to_db] no odometry recorded — check the run above", file=sys.stderr) return 1 @@ -346,7 +393,7 @@ def _run(args: argparse.Namespace) -> int: ) if not args.no_rrd: try: - rrd = _write_rrd(db_path, args.odom_stream_name, args.lidar_stream_name, args.voxel) + rrd = _write_rrd(db_path, _ODOM_STREAM, _LIDAR_STREAM, args.voxel) if rrd is not None: print(f"[pcap_to_db] wrote {rrd.name} (aggregated lidar + pose path)", flush=True) except Exception as exc: # viz is a non-fatal bonus @@ -375,13 +422,6 @@ def main(argv: list[str]) -> int: default=0.0, help="stop after N sensor seconds (0 = whole pcap)", ) - parser.add_argument( - "--time-offset", - type=float, - default=None, - help="seconds added to every output ts (auto if omitted)", - ) - parser.add_argument("--force", action="store_true", help="overwrite existing pointlio streams") parser.add_argument( "--no-rrd", action="store_true", @@ -399,17 +439,7 @@ def main(argv: list[str]) -> int: parser.add_argument( "--config", default="", - help="Point-LIO YAML config (pwd-relative or absolute; default: module's default.yaml)", - ) - parser.add_argument( - "--odom-stream-name", - default="pointlio_odometry", - help="db stream/table name for the recorded odometry", - ) - parser.add_argument( - "--lidar-stream-name", - default="pointlio_lidar", - help="db stream/table name for the recorded point cloud", + help="YAML/JSON doc of PointLioConfig field overrides (e.g. {acc_cov_input: 0.3})", ) # Addressing knobs (override to run two replays at once). parser.add_argument("--host-ip", default="192.168.1.5") diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index 00cd46a1d0..e16c1527e7 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -14,7 +14,7 @@ from __future__ import annotations -from collections.abc import Callable +from collections.abc import Awaitable, Callable import enum import inspect import os @@ -274,15 +274,20 @@ class RecorderConfig(MemoryModuleConfig): stream_remapping: dict[str, str] = Field(default_factory=dict) -PoseSetter = Callable[[Any], "Pose | None"] +PoseSetter = Callable[[Any], "Awaitable[Pose | None]"] def pose_setter_for(*stream_names: str) -> Callable[[Any], Any]: - """Mark a method ``(self, msg) -> Pose | None`` as the pose setter for the - given recorded stream(s). Streams without a setter fall back to the tf-based - ``world <- frame_id`` lookup.""" + """Mark an ``async def`` method ``(self, msg) -> Pose | None`` as the pose + setter for the given recorded stream(s). Streams without a setter fall back + to the tf-based ``world <- frame_id`` lookup.""" def decorate(fn: Any) -> Any: + if not inspect.iscoroutinefunction(fn): + raise TypeError( + f"@pose_setter_for must decorate an `async def` method; " + f"{getattr(fn, '__qualname__', fn)} is not async" + ) fn._pose_setter_for = tuple(stream_names) return fn @@ -302,10 +307,11 @@ class MyRecorder(Recorder): Each stream's pose defaults to a ``world <- frame_id`` tf lookup; decorate a method with ``@pose_setter_for("stream")`` to source it elsewhere (e.g. from - an odometry stream):: + an odometry stream). Setters run on the module's event loop and may be + ``async def``:: @pose_setter_for("lidar") - def _lidar_pose(self, msg): + async def _lidar_pose(self, msg): return self._last_odom_pose """ @@ -368,12 +374,14 @@ def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any]) already in world coords) fall back to ``config.default_frame_id`` — so every observation gets a robot-pose anchor when tf is publishing. - Registers the subscription as a disposable on this module. + Each port is recorded by an async callback dispatched on the module's + event loop via :meth:`process_observable`, which serialises invocations + and registers the subscription for cleanup on stop(). """ - def on_msg(msg: Any) -> None: + async def on_msg(msg: Any) -> None: ts = self._resolve_ts(name, msg) - pose = self._resolve_pose(name, msg, ts) + pose = await self._resolve_pose(name, msg, ts) if not pose: logger.warning( "[%s] No pose for time %s (msg ts: %s), storing without pose", @@ -383,7 +391,7 @@ def on_msg(msg: Any) -> None: ) stream.append(msg, ts=ts, pose=pose) - self.register_disposable(Disposable(input_topic.subscribe(on_msg))) + self.process_observable(input_topic.pure_observable(), on_msg) def _prepare_streams(self) -> None: """On APPEND, drop the streams this recorder is about to (re)write — the @@ -401,13 +409,13 @@ def _resolve_ts(self, name: str, msg: Any) -> float: """Timestamp to record *msg* at. Override to re-base onto another clock.""" return getattr(msg, "ts", None) or time.time() - def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None: - """Pose to anchor *msg* with. Dispatches to the stream's + async def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None: + """Pose to anchor *msg* with. Dispatches to the stream's (async) ``@pose_setter_for`` if one is defined, else falls back to a ``world <- frame_id`` tf lookup.""" setter = self._pose_setters.get(name) if setter is not None: - return cast("Pose | None", setter(msg)) + return cast("Pose | None", await setter(msg)) frame_id = getattr(msg, "frame_id", None) or self.config.default_frame_id transform = self.tf.get( self.config.root_frame, frame_id, time_point=ts, time_tolerance=self.config.tf_tolerance diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index 9993e541c4..3351a3dc16 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -62,13 +62,16 @@ class Go2Memory(Recorder): _last_odom_pose: Pose | None = None @pose_setter_for("odom") - def _odom_pose(self, msg: PoseStamped) -> Pose | None: + async def _odom_pose(self, msg: PoseStamped) -> Pose | None: self._last_odom_pose = msg return self._last_odom_pose @pose_setter_for("lidar") - def _lidar_pose(self, msg: PointCloud2) -> Pose | None: - return self._last_odom_pose # should always exist (odom alwyas wins the race) + async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: + # Yes, it doesn't make sense to register lidar at the odom pose because the + # go2 lidar is in the world frame, but map.py (for now) needs this. + # TODO: fix map.py to use a transform frame + return getattr(self, "_last_odom_pose", None) unitree_go2_markers = (