From dba1e5a3827287d8660a887713cf7a2e96537887 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Tue, 23 Jun 2026 19:12:24 -0700 Subject: [PATCH 1/4] recorder: async callbacks + async pose_setter_for Convert the memory2 Recorder from thread/disposable rx subscriptions to manual async callbacks via process_observable, and let pose_setter_for methods be async (awaited in _resolve_pose). Update the fastlio and go2 recorders accordingly. --- .../sensors/lidar/fastlio2/recorder.py | 4 +-- dimos/memory2/module.py | 35 +++++++++++-------- .../go2/blueprints/smart/unitree_go2.py | 9 +++-- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/dimos/hardware/sensors/lidar/fastlio2/recorder.py b/dimos/hardware/sensors/lidar/fastlio2/recorder.py index 08784cfe89..5f081d13e9 100644 --- a/dimos/hardware/sensors/lidar/fastlio2/recorder.py +++ b/dimos/hardware/sensors/lidar/fastlio2/recorder.py @@ -45,13 +45,13 @@ class FastLio2Recorder(Recorder): _last_odom_pose: Pose | None = None @pose_setter_for("fastlio_odometry") - def _odom_pose(self, msg: Odometry) -> Pose | None: + async def _odom_pose(self, msg: Odometry) -> Pose | None: pose = getattr(msg, "pose", None) self._last_odom_pose = getattr(pose, "pose", None) if pose is not None else None return self._last_odom_pose @pose_setter_for("fastlio_lidar") - def _lidar_pose(self, msg: PointCloud2) -> Pose | None: + async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: # Most-recent odometry pose, stamped directly (no tf). None before the # first odometry -> frame stored unposed, map-skipped. return self._last_odom_pose diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index 00cd46a1d0..f494a2070d 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -14,7 +14,7 @@ from __future__ import annotations -from collections.abc import Callable +from collections.abc import Awaitable, Callable import enum import inspect import os @@ -274,13 +274,14 @@ class RecorderConfig(MemoryModuleConfig): stream_remapping: dict[str, str] = Field(default_factory=dict) -PoseSetter = Callable[[Any], "Pose | None"] +PoseSetter = Callable[[Any], "Pose | None | Awaitable[Pose | None]"] def pose_setter_for(*stream_names: str) -> Callable[[Any], Any]: """Mark a method ``(self, msg) -> Pose | None`` as the pose setter for the - given recorded stream(s). Streams without a setter fall back to the tf-based - ``world <- frame_id`` lookup.""" + given recorded stream(s). The method may be sync or ``async def`` — the + recorder awaits it if it returns an awaitable. Streams without a setter fall + back to the tf-based ``world <- frame_id`` lookup.""" def decorate(fn: Any) -> Any: fn._pose_setter_for = tuple(stream_names) @@ -302,10 +303,11 @@ class MyRecorder(Recorder): Each stream's pose defaults to a ``world <- frame_id`` tf lookup; decorate a method with ``@pose_setter_for("stream")`` to source it elsewhere (e.g. from - an odometry stream):: + an odometry stream). Setters run on the module's event loop and may be + ``async def``:: @pose_setter_for("lidar") - def _lidar_pose(self, msg): + async def _lidar_pose(self, msg): return self._last_odom_pose """ @@ -368,12 +370,14 @@ def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any]) already in world coords) fall back to ``config.default_frame_id`` — so every observation gets a robot-pose anchor when tf is publishing. - Registers the subscription as a disposable on this module. + Each port is recorded by an async callback dispatched on the module's + event loop via :meth:`process_observable`, which serialises invocations + and registers the subscription for cleanup on stop(). """ - def on_msg(msg: Any) -> None: + async def on_msg(msg: Any) -> None: ts = self._resolve_ts(name, msg) - pose = self._resolve_pose(name, msg, ts) + pose = await self._resolve_pose(name, msg, ts) if not pose: logger.warning( "[%s] No pose for time %s (msg ts: %s), storing without pose", @@ -383,7 +387,7 @@ def on_msg(msg: Any) -> None: ) stream.append(msg, ts=ts, pose=pose) - self.register_disposable(Disposable(input_topic.subscribe(on_msg))) + self.process_observable(input_topic.pure_observable(), on_msg) def _prepare_streams(self) -> None: """On APPEND, drop the streams this recorder is about to (re)write — the @@ -401,13 +405,16 @@ def _resolve_ts(self, name: str, msg: Any) -> float: """Timestamp to record *msg* at. Override to re-base onto another clock.""" return getattr(msg, "ts", None) or time.time() - def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None: + async def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None: """Pose to anchor *msg* with. Dispatches to the stream's - ``@pose_setter_for`` if one is defined, else falls back to a - ``world <- frame_id`` tf lookup.""" + ``@pose_setter_for`` if one is defined (awaited when async), else falls + back to a ``world <- frame_id`` tf lookup.""" setter = self._pose_setters.get(name) if setter is not None: - return cast("Pose | None", setter(msg)) + result = setter(msg) + if inspect.isawaitable(result): + result = await result + return cast("Pose | None", result) frame_id = getattr(msg, "frame_id", None) or self.config.default_frame_id transform = self.tf.get( self.config.root_frame, frame_id, time_point=ts, time_tolerance=self.config.tf_tolerance diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index 9993e541c4..3351a3dc16 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -62,13 +62,16 @@ class Go2Memory(Recorder): _last_odom_pose: Pose | None = None @pose_setter_for("odom") - def _odom_pose(self, msg: PoseStamped) -> Pose | None: + async def _odom_pose(self, msg: PoseStamped) -> Pose | None: self._last_odom_pose = msg return self._last_odom_pose @pose_setter_for("lidar") - def _lidar_pose(self, msg: PointCloud2) -> Pose | None: - return self._last_odom_pose # should always exist (odom alwyas wins the race) + async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: + # Yes, it doesn't make sense to register lidar at the odom pose because the + # go2 lidar is in the world frame, but map.py (for now) needs this. + # TODO: fix map.py to use a transform frame + return getattr(self, "_last_odom_pose", None) unitree_go2_markers = ( From af6a06be3071f37af2f9df7241f8b4e0c899b4f6 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Tue, 23 Jun 2026 23:49:58 -0700 Subject: [PATCH 2/4] recorder: require @pose_setter_for setters to be async Raise TypeError at decoration time if a non-async function is decorated, and always await the setter in _resolve_pose. --- dimos/memory2/module.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index f494a2070d..e16c1527e7 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -274,16 +274,20 @@ class RecorderConfig(MemoryModuleConfig): stream_remapping: dict[str, str] = Field(default_factory=dict) -PoseSetter = Callable[[Any], "Pose | None | Awaitable[Pose | None]"] +PoseSetter = Callable[[Any], "Awaitable[Pose | None]"] def pose_setter_for(*stream_names: str) -> Callable[[Any], Any]: - """Mark a method ``(self, msg) -> Pose | None`` as the pose setter for the - given recorded stream(s). The method may be sync or ``async def`` — the - recorder awaits it if it returns an awaitable. Streams without a setter fall - back to the tf-based ``world <- frame_id`` lookup.""" + """Mark an ``async def`` method ``(self, msg) -> Pose | None`` as the pose + setter for the given recorded stream(s). Streams without a setter fall back + to the tf-based ``world <- frame_id`` lookup.""" def decorate(fn: Any) -> Any: + if not inspect.iscoroutinefunction(fn): + raise TypeError( + f"@pose_setter_for must decorate an `async def` method; " + f"{getattr(fn, '__qualname__', fn)} is not async" + ) fn._pose_setter_for = tuple(stream_names) return fn @@ -406,15 +410,12 @@ def _resolve_ts(self, name: str, msg: Any) -> float: return getattr(msg, "ts", None) or time.time() async def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None: - """Pose to anchor *msg* with. Dispatches to the stream's - ``@pose_setter_for`` if one is defined (awaited when async), else falls - back to a ``world <- frame_id`` tf lookup.""" + """Pose to anchor *msg* with. Dispatches to the stream's (async) + ``@pose_setter_for`` if one is defined, else falls back to a + ``world <- frame_id`` tf lookup.""" setter = self._pose_setters.get(name) if setter is not None: - result = setter(msg) - if inspect.isawaitable(result): - result = await result - return cast("Pose | None", result) + return cast("Pose | None", await setter(msg)) frame_id = getattr(msg, "frame_id", None) or self.config.default_frame_id transform = self.tf.get( self.config.root_frame, frame_id, time_point=ts, time_tolerance=self.config.tf_tolerance From df4d802ebbf26daf0868536c50a5c0c9c3913444 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 24 Jun 2026 15:10:35 +0800 Subject: [PATCH 3/4] - --- dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py index 3351a3dc16..e50cf8b8e3 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py @@ -68,8 +68,9 @@ async def _odom_pose(self, msg: PoseStamped) -> Pose | None: @pose_setter_for("lidar") async def _lidar_pose(self, msg: PointCloud2) -> Pose | None: - # Yes, it doesn't make sense to register lidar at the odom pose because the - # go2 lidar is in the world frame, but map.py (for now) needs this. + # go2 lidar (currently) is in world-frame + # so it doesn't make sense to register lidar at the odom pose + # but we do it anyways because map.py (for now) requires it # TODO: fix map.py to use a transform frame return getattr(self, "_last_odom_pose", None) From ab31c4b72a3005082f122bd5b0f5939418165556 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 24 Jun 2026 00:27:07 -0700 Subject: [PATCH 4/4] recorder: count + warn on frames dropped by LATEST coalescing process_observable gains an optional on_drop callback fired once per message dropped by the dispatcher's single-slot LATEST mailbox. The Recorder uses it to count dropped frames per stream and log a throttled warning, so a slow sink no longer loses data silently. --- dimos/core/module.py | 18 ++++++++++++++---- dimos/memory2/module.py | 22 +++++++++++++++++++++- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/dimos/core/module.py b/dimos/core/module.py index 26a2b6f893..fdf6d23061 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -477,14 +477,16 @@ def process_observable( self, observable: "Observable[Any]", async_cb: Callable[[Any], Any], + on_drop: Callable[[], None] | None = None, ) -> "DisposableBase": """Subscribe `async_cb` (an async function) to `observable`, dispatching each emitted value onto self._loop. Invocations are serialized through a - per-subscription dispatcher task with LATEST coalescing. The subscription + per-subscription dispatcher task with LATEST coalescing. `on_drop`, if + given, fires once per message dropped by that coalescing. The subscription is registered for cleanup on stop().""" if not inspect.iscoroutinefunction(async_cb): raise TypeError("process_observable requires an `async def` callback") - on_msg, dispatcher_disp = self._make_async_dispatch(async_cb) + on_msg, dispatcher_disp = self._make_async_dispatch(async_cb, on_drop) sub = observable.subscribe(on_msg) return self.register_disposable(CompositeDisposable(sub, dispatcher_disp)) @@ -635,7 +637,9 @@ def _auto_bind_handlers(self) -> None: self.process_observable(in_stream.pure_observable(), handler) def _make_async_dispatch( - self, async_handler: Callable[[Any], Any] + self, + async_handler: Callable[[Any], Any], + on_drop: Callable[[], None] | None = None, ) -> tuple[Callable[[Any], None], "DisposableBase"]: """Build a sync callback that delivers `msg` into a single-slot LATEST mailbox drained by a dedicated dispatcher task on `self._loop`. @@ -645,7 +649,9 @@ def _make_async_dispatch( awaits). - If messages arrive faster than the handler can process them, intermediate messages are dropped and only the most recent unprocessed - message is kept (LATEST policy). + message is kept (LATEST policy). `on_drop`, if given, is called once + per dropped message (on the loop thread) so callers that need every + message can surface the loss. - The returned Disposable cancels the dispatcher task. """ loop = self._loop @@ -685,6 +691,10 @@ def on_msg(msg: Any) -> None: return def _set() -> None: + # A slot that still holds an unconsumed value is about to be + # overwritten — that queued message is being dropped (LATEST). + if slot["has_value"] and on_drop is not None: + on_drop() slot["value"] = msg slot["has_value"] = True event.set() diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index e16c1527e7..cc3aebc67d 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -318,6 +318,9 @@ async def _lidar_pose(self, msg): config: RecorderConfig _pose_setters: dict[str, Any] = {} + # Per-stream count of frames lost to the dispatcher's LATEST coalescing + # (sink slower than input). Populated lazily as drops happen. + _dropped_frames: dict[str, int] = {} @rpc def start(self) -> None: @@ -330,6 +333,7 @@ def start(self) -> None: return self._pose_setters = self._collect_pose_setters() + self._dropped_frames = {} # TODO: store reset API/logic is not implemented yet. This module # shouldn't need to know about files (SqliteStore specific), and @@ -391,7 +395,23 @@ async def on_msg(msg: Any) -> None: ) stream.append(msg, ts=ts, pose=pose) - self.process_observable(input_topic.pure_observable(), on_msg) + self.process_observable( + input_topic.pure_observable(), on_msg, on_drop=lambda: self._on_frame_dropped(name) + ) + + def _on_frame_dropped(self, name: str) -> None: + """A frame for *name* was dropped because the sink couldn't keep up with + the input rate (dispatcher LATEST coalescing). Count it and warn — once, + then on each power-of-ten — so silent data loss is visible without + flooding the log.""" + count = self._dropped_frames.get(name, 0) + 1 + self._dropped_frames[name] = count + if count == 1 or count % 1000 == 0: + logger.warning( + "[%s] Recorder dropped %d frame(s) — sink slower than input; recording is lossy", + name, + count, + ) def _prepare_streams(self) -> None: """On APPEND, drop the streams this recorder is about to (re)write — the