Skip to content

async recorder callbacks#2577

Open
jeff-hykin wants to merge 6 commits into
mainfrom
jeff/fix/pose_setter_for
Open

async recorder callbacks#2577
jeff-hykin wants to merge 6 commits into
mainfrom
jeff/fix/pose_setter_for

Conversation

@jeff-hykin

@jeff-hykin jeff-hykin commented Jun 24, 2026

Copy link
Copy Markdown
Member

Async pose_setter_for for memory2. Updates the fastlio and go2 recorders to match.

@codecov

codecov Bot commented Jun 24, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 36.36364% with 14 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
dimos/memory2/module.py 31.25% 10 Missing and 1 partial ⚠️
dimos/core/module.py 33.33% 1 Missing and 1 partial ⚠️
.../robot/unitree/go2/blueprints/smart/unitree_go2.py 66.66% 1 Missing ⚠️
@@            Coverage Diff             @@
##             main    #2577      +/-   ##
==========================================
+ Coverage   69.61%   70.87%   +1.26%     
==========================================
  Files         878      866      -12     
  Lines       79326    77253    -2073     
  Branches     7126     6875     -251     
==========================================
- Hits        55220    54755     -465     
+ Misses      22301    20695    -1606     
+ Partials     1805     1803       -2     
Flag Coverage Δ
OS-ubuntu-24.04-arm 62.99% <36.36%> (+<0.01%) ⬆️
OS-ubuntu-latest 65.82% <36.36%> (-0.01%) ⬇️
Py-3.10 65.81% <36.36%> (-0.01%) ⬇️
Py-3.11 65.81% <36.36%> (-0.01%) ⬇️
Py-3.12 65.81% <36.36%> (-0.01%) ⬇️
Py-3.13 65.82% <36.36%> (-0.01%) ⬇️
Py-3.14 65.83% <36.36%> (-0.01%) ⬇️
Py-3.14t 65.81% <36.36%> (-0.01%) ⬇️
SelfHosted-Large 30.10% <31.81%> (+<0.01%) ⬆️
SelfHosted-Linux 37.80% <31.81%> (+<0.01%) ⬆️
SelfHosted-macOS 36.63% <31.81%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../robot/unitree/go2/blueprints/smart/unitree_go2.py 92.50% <66.66%> (ø)
dimos/core/module.py 77.29% <33.33%> (-0.33%) ⬇️
dimos/memory2/module.py 41.96% <31.25%> (-0.83%) ⬇️

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@jeff-hykin jeff-hykin force-pushed the jeff/fix/pose_setter_for branch from 27a5a72 to 6276f5b Compare June 24, 2026 02:52
Convert the memory2 Recorder from thread/disposable rx subscriptions to
manual async callbacks via process_observable, and let pose_setter_for
methods be async (awaited in _resolve_pose). Update the fastlio and go2
recorders accordingly.
@jeff-hykin jeff-hykin force-pushed the jeff/fix/pose_setter_for branch from 6276f5b to dba1e5a Compare June 24, 2026 06:45
Raise TypeError at decoration time if a non-async function is decorated,
and always await the setter in _resolve_pose.
@jeff-hykin jeff-hykin changed the title recorder: async callbacks + async pose_setter_for async recorder callbacks Jun 24, 2026
@jeff-hykin jeff-hykin marked this pull request as ready for review June 24, 2026 07:12
@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR migrates pose_setter_for-decorated methods to async def, threads the async dispatch through process_observable, and wires in the new on_drop callback to surface silent frame-loss via a throttled log warning.

  • pose_setter_for now enforces async def at decoration time (raises TypeError for sync functions), _resolve_pose is made async, and _port_to_stream switches from a raw Disposable(subscribe(...)) to process_observable — giving clean lifecycle management and the LATEST-coalescing dispatcher.
  • dimos/core/module.py gains an on_drop: Callable[[], None] | None parameter on both process_observable and _make_async_dispatch, fired inside call_soon_threadsafe just before the slot is overwritten.
  • FastLio2Recorder and Go2Memory setters are updated to async def with no logic changes (except Go2Memory._lidar_pose switches to getattr for a safer read of _last_odom_pose).

Confidence Score: 5/5

Safe to merge — all known users of pose_setter_for are updated and the core dispatch path is unchanged.

The changes are mechanical and well-scoped: a new optional parameter is threaded through the dispatcher, the decorator adds a static check, and the three concrete recorder classes are updated consistently. No existing dispatch logic is altered and the only callsites for pose_setter_for in the repo are all covered by this PR.

No files require special attention.

Important Files Changed

Filename Overview
dimos/core/module.py Adds on_drop parameter to process_observable and _make_async_dispatch; callback fires inside call_soon_threadsafe on the loop thread before overwriting the LATEST slot. Clean addition with no logic changes to the existing dispatch path.
dimos/memory2/module.py Migrates pose_setter_for to require async def, changes subscription mechanism to process_observable, adds _dropped_frames counter with a throttled warning. Minor docstring inaccuracy (says "power-of-ten" but logs every 1000).
dimos/hardware/sensors/lidar/fastlio2/recorder.py Both pose setter methods updated to async def to comply with the new decorator requirement; logic is unchanged.
dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py Pose setters converted to async def; _lidar_pose switched to getattr(self, "_last_odom_pose", None) and updated with a comment about the world-frame todo.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Src as Input Topic (thread)
    participant Slot as LATEST Slot (event loop)
    participant Disp as Dispatcher Task
    participant CB as async on_msg
    participant Drop as _on_frame_dropped

    Src->>Slot: call_soon_threadsafe(_set)
    alt slot already has value
        Slot->>Drop: on_drop() — frame lost
    end
    Slot->>Slot: overwrite value, set event
    Disp->>Slot: await event, consume value
    Disp->>CB: await on_msg(msg)
    CB->>CB: _resolve_ts / await _resolve_pose
    CB->>CB: stream.append(msg, ts, pose)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Src as Input Topic (thread)
    participant Slot as LATEST Slot (event loop)
    participant Disp as Dispatcher Task
    participant CB as async on_msg
    participant Drop as _on_frame_dropped

    Src->>Slot: call_soon_threadsafe(_set)
    alt slot already has value
        Slot->>Drop: on_drop() — frame lost
    end
    Slot->>Slot: overwrite value, set event
    Disp->>Slot: await event, consume value
    Disp->>CB: await on_msg(msg)
    CB->>CB: _resolve_ts / await _resolve_pose
    CB->>CB: stream.append(msg, ts, pose)
Loading

Reviews (2): Last reviewed commit: "recorder: count + warn on frames dropped..." | Re-trigger Greptile

Comment thread dimos/memory2/module.py Outdated
stream.append(msg, ts=ts, pose=pose)

self.register_disposable(Disposable(input_topic.subscribe(on_msg)))
self.process_observable(input_topic.pure_observable(), on_msg)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Silent frame drops under sustained high-frequency input

process_observable uses a single-slot LATEST mailbox: if on_msg is still awaiting _resolve_pose / stream.append when the next message arrives, the queued-but-unprocessed item is silently overwritten. The old Disposable(input_topic.subscribe(on_msg)) approach would block the publisher instead of dropping.

For typical sensor rates and a fast SQLite write this should be fine in practice, but there's no dropped-frame counter or warning anywhere — if the DB slows down or a pose setter blocks, missing frames would be invisible. Consider logging a warning (or incrementing a counter) inside the LATEST-coalescing slot in _make_async_dispatch, or in on_msg itself if a pre-existing slot is detected, so operators know when data is being lost.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I'll add a warning for dropped stuff

process_observable gains an optional on_drop callback fired once per
message dropped by the dispatcher's single-slot LATEST mailbox. The
Recorder uses it to count dropped frames per stream and log a throttled
warning, so a slow sink no longer loses data silently.
@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jun 24, 2026
@jeff-hykin jeff-hykin enabled auto-merge (squash) June 24, 2026 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready-to-merge Required CI checks have passed on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant