Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dba1e5a
recorder: async callbacks + async pose_setter_for
jeff-hykin Jun 24, 2026
af6a06b
recorder: require @pose_setter_for setters to be async
jeff-hykin Jun 24, 2026
a815704
Merge branch 'main' into jeff/fix/pose_setter_for
jeff-hykin Jun 24, 2026
df4d802
-
jeff-hykin Jun 24, 2026
b144638
Merge branch 'jeff/fix/pose_setter_for' of github.com:dimensionalOS/d…
jeff-hykin Jun 24, 2026
e8287a2
add gsc_pgo loop-closure module pinned to public gsc_pgo repo
jeff-hykin Jun 24, 2026
f9d3acb
add jnav PGO message types (Graph3D, GraphDelta3D, Landmark, Marker)
jeff-hykin Jun 24, 2026
617e5f7
add jnav utils for PGO (apriltags, recording_db, trajectory_metrics, …
jeff-hykin Jun 24, 2026
ac693ad
add ivan_pgo, ivan_pgo_transformer and unrefined_pgo loop-closure mod…
jeff-hykin Jun 24, 2026
13f97da
add loop-closure eval/benchmark scripts and spec
jeff-hykin Jun 24, 2026
f17a869
add tf tree support to memory2 store (DbTf, write_tf_tree, lazy store…
jeff-hykin Jun 24, 2026
1fb8536
add map post-processing pipeline (post_process, add_april, detect_tag…
jeff-hykin Jun 24, 2026
898fdef
add jnav map post-processing and PGO migration docs
jeff-hykin Jun 24, 2026
ab31c4b
recorder: count + warn on frames dropped by LATEST coalescing
jeff-hykin Jun 24, 2026
67a5ef0
Merge remote-tracking branch 'origin/jeff/fix/pose_setter_for' into j…
jeff-hykin Jun 24, 2026
d6c0c66
remove comparision modules
jeff-hykin Jun 24, 2026
844f8f8
address greptile review: guard SQL table names, lock tf lazy-load, cl…
jeff-hykin Jun 24, 2026
12d2b6a
port missing apriltag gate functions (ensure_april_streams, gate_para…
jeff-hykin Jun 24, 2026
49bd142
fix mypy strict errors in jnav (type annotations, SqliteStoreConfig.p…
jeff-hykin Jun 24, 2026
bf56b94
drop section-marker comments to satisfy codebase check
jeff-hykin Jun 24, 2026
9d5e332
regenerate all_blueprints.py for jnav loop-closure modules
jeff-hykin Jun 24, 2026
60d114c
move jnav .gitignore up to jnav/ root
jeff-hykin Jun 24, 2026
aae4319
add public MultiTBuffer.lookup (non-warning) and use it in DbTf inste…
jeff-hykin Jun 24, 2026
5f3adfc
rename DEFAULT_MARKER_LENGTH_M -> DEFAULT_MARKER_LENGTH_METERS
jeff-hykin Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,16 @@ def process_observable(
self,
observable: "Observable[Any]",
async_cb: Callable[[Any], Any],
on_drop: Callable[[], None] | None = None,
) -> "DisposableBase":
"""Subscribe `async_cb` (an async function) to `observable`, dispatching
each emitted value onto self._loop. Invocations are serialized through a
per-subscription dispatcher task with LATEST coalescing. The subscription
per-subscription dispatcher task with LATEST coalescing. `on_drop`, if
given, fires once per message dropped by that coalescing. The subscription
is registered for cleanup on stop()."""
if not inspect.iscoroutinefunction(async_cb):
raise TypeError("process_observable requires an `async def` callback")
on_msg, dispatcher_disp = self._make_async_dispatch(async_cb)
on_msg, dispatcher_disp = self._make_async_dispatch(async_cb, on_drop)
sub = observable.subscribe(on_msg)
return self.register_disposable(CompositeDisposable(sub, dispatcher_disp))

Expand Down Expand Up @@ -635,7 +637,9 @@ def _auto_bind_handlers(self) -> None:
self.process_observable(in_stream.pure_observable(), handler)

def _make_async_dispatch(
self, async_handler: Callable[[Any], Any]
self,
async_handler: Callable[[Any], Any],
on_drop: Callable[[], None] | None = None,
) -> tuple[Callable[[Any], None], "DisposableBase"]:
"""Build a sync callback that delivers `msg` into a single-slot LATEST
mailbox drained by a dedicated dispatcher task on `self._loop`.
Expand All @@ -645,7 +649,9 @@ def _make_async_dispatch(
awaits).
- If messages arrive faster than the handler can process them,
intermediate messages are dropped and only the most recent unprocessed
message is kept (LATEST policy).
message is kept (LATEST policy). `on_drop`, if given, is called once
per dropped message (on the loop thread) so callers that need every
message can surface the loss.
- The returned Disposable cancels the dispatcher task.
"""
loop = self._loop
Expand Down Expand Up @@ -685,6 +691,10 @@ def on_msg(msg: Any) -> None:
return

def _set() -> None:
# A slot that still holds an unconsumed value is about to be
# overwritten — that queued message is being dropped (LATEST).
if slot["has_value"] and on_drop is not None:
on_drop()
slot["value"] = msg
slot["has_value"] = True
event.set()
Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/lidar/fastlio2/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ class FastLio2Recorder(Recorder):
_last_odom_pose: Pose | None = None

@pose_setter_for("fastlio_odometry")
def _odom_pose(self, msg: Odometry) -> Pose | None:
async def _odom_pose(self, msg: Odometry) -> Pose | None:
pose = getattr(msg, "pose", None)
self._last_odom_pose = getattr(pose, "pose", None) if pose is not None else None
return self._last_odom_pose

@pose_setter_for("fastlio_lidar")
def _lidar_pose(self, msg: PointCloud2) -> Pose | None:
async def _lidar_pose(self, msg: PointCloud2) -> Pose | None:
# Most-recent odometry pose, stamped directly (no tf). None before the
# first odometry -> frame stored unposed, map-skipped.
return self._last_odom_pose
198 changes: 198 additions & 0 deletions dimos/memory2/db_tf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Transform lookups over the transforms recorded in a store.

A store's ``tf`` member lazily reads every transform recorded under the ``tf``
(and ``tf_static``) streams into a :class:`MultiTBuffer`, then answers
``store.tf.get(target_frame, source_frame, time)`` — composing multi-hop chains
(e.g. ``world -> map -> odom -> base_link -> mid360_link``) and interpolating to
the nearest recorded sample. This makes world-registration a real transform
lookup instead of assuming a single baked-in pose.

``write_tf_tree`` populates those streams for a recording that lacks them.
"""

from __future__ import annotations

import re
import sqlite3
import threading
from typing import TYPE_CHECKING

import numpy as np

from dimos.memory2.store.sqlite import SqliteStoreConfig
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.geometry_msgs.Vector3 import Vector3
from dimos.msgs.tf2_msgs.TFMessage import TFMessage
from dimos.protocol.tf.tf import MultiTBuffer

if TYPE_CHECKING:
from dimos.memory2.store.base import Store

TF_STREAMS = ("tf", "tf_static")
# Larger than any single recording's span so the buffer never prunes loaded
# transforms (MultiTBuffer drops samples older than ts - buffer_size).
_NO_PRUNE = 1.0e15
# SQLite can't parameterize table names, so caller-supplied stream names are
# interpolated; allow only safe identifiers to keep that injection-free.
_SAFE_TABLE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")


def _safe_table(name: str) -> str:
if not _SAFE_TABLE.match(name):
raise ValueError(f"unsafe stream/table name: {name!r}")
return name


class DbTf:
"""Transform lookups backed by the ``tf``/``tf_static`` streams of a store."""

def __init__(self, store: Store, stream_names: tuple[str, ...] = TF_STREAMS) -> None:
self._store = store
self._stream_names = stream_names
self._buffer: MultiTBuffer | None = None
self._load_lock = threading.Lock()

def _ensure_loaded(self) -> MultiTBuffer:
if self._buffer is not None:
return self._buffer
with self._load_lock:
if self._buffer is not None: # another thread loaded while we waited
return self._buffer
buffer = MultiTBuffer(buffer_size=_NO_PRUNE)
available = set(self._store.list_streams())
for name in self._stream_names:
if name not in available:
continue
for observation in self._store.stream(name, TFMessage):
message = observation.data
transforms = getattr(message, "transforms", None)
if transforms is None:
transforms = [message]
buffer.receive_transform(*transforms)
self._buffer = buffer
return buffer

def has_transforms(self) -> bool:
return bool(self._ensure_loaded().buffers)

def get(
self,
target_frame: str,
source_frame: str,
time_point: float | None = None,
time_tolerance: float | None = None,
) -> Transform | None:
"""Transform that maps a point in ``source_frame`` into ``target_frame``.

Returns ``None`` if no chain connects the two frames. Uses the buffer's
non-warning lookup so per-scan misses don't spam the log.
"""
buffer = self._ensure_loaded()
return buffer.lookup(target_frame, source_frame, time_point, time_tolerance)


def transform_matrix(transform: Transform) -> tuple[np.ndarray, np.ndarray]:
"""Return ``(R, t)`` (3x3, 3) for ``transform`` so ``p_target = p_source @ R.T + t``."""
rotation = transform.rotation
rotation_matrix = np.asarray(rotation.to_rotation_matrix(), float).reshape(3, 3)
translation = np.array(
[transform.translation.x, transform.translation.y, transform.translation.z], float
)
return rotation_matrix, translation


def write_tf_tree(
store: Store,
*,
odom_stream: str,
odom_parent: str = "odom",
odom_child: str = "base_link",
root_links: tuple[tuple[str, str], ...] = (("world", "map"), ("map", "odom")),
sensor_child: str = "mid360_link",
sensor_translation: tuple[float, float, float] = (0.0, 0.0, 0.0),
sensor_rotation: tuple[float, float, float, float] = (0.0, 0.0, 0.0, 1.0),
static_period: float = 0.45,
stream_name: str = "tf",
) -> int:
"""Populate ``store``'s tf stream from an odometry stream.

- ``root_links`` and ``odom_child -> sensor_child`` are emitted as identity /
fixed transforms every ``static_period`` seconds across the recording span.
- ``odom_parent -> odom_child`` is emitted once per odometry sample, taken
from each observation's pose.

Returns the number of tf observations written.
"""
config = store.config
if not isinstance(config, SqliteStoreConfig):
raise TypeError("write_tf_tree reads the db directly and needs a SqliteStore")
db_path = config.path
connection = sqlite3.connect(f"file:{db_path}?mode=ro&immutable=1", uri=True)
odom = np.array(
list(
connection.execute(
"select ts,pose_x,pose_y,pose_z,pose_qx,pose_qy,pose_qz,pose_qw "
f"from {_safe_table(odom_stream)} order by ts"
)
),
float,
Comment thread
jeff-hykin marked this conversation as resolved.
)
connection.close()
if not len(odom):
raise ValueError(f"odom stream {odom_stream!r} is empty; cannot build tf tree")

tf_stream = store.stream(stream_name, TFMessage)
written = 0

# dynamic: odom_parent -> odom_child, one per odometry sample
for row in odom:
ts = float(row[0])
transform = Transform(
translation=Vector3(row[1], row[2], row[3]),
rotation=Quaternion(row[4], row[5], row[6], row[7]),
frame_id=odom_parent,
child_frame_id=odom_child,
ts=ts,
)
tf_stream.append(TFMessage(transform), ts=ts)
written += 1

# static: root links + sensor mount, resampled every static_period
t0 = float(odom[0, 0])
t1 = float(odom[-1, 0])

def statics_at(ts: float) -> list[Transform]:
links = [
Transform(frame_id=parent, child_frame_id=child, ts=ts) for parent, child in root_links
]
links.append(
Transform(
translation=Vector3(*sensor_translation),
rotation=Quaternion(*sensor_rotation),
frame_id=odom_child,
child_frame_id=sensor_child,
ts=ts,
)
)
return links

for static_ts in np.arange(t0, t1 + static_period, static_period):
tf_stream.append(TFMessage(*statics_at(float(static_ts))), ts=float(static_ts))
written += 1

return written
56 changes: 42 additions & 14 deletions dimos/memory2/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from collections.abc import Callable
from collections.abc import Awaitable, Callable
import enum
import inspect
import os
Expand Down Expand Up @@ -274,15 +274,20 @@ class RecorderConfig(MemoryModuleConfig):
stream_remapping: dict[str, str] = Field(default_factory=dict)


PoseSetter = Callable[[Any], "Pose | None"]
PoseSetter = Callable[[Any], "Awaitable[Pose | None]"]


def pose_setter_for(*stream_names: str) -> Callable[[Any], Any]:
"""Mark a method ``(self, msg) -> Pose | None`` as the pose setter for the
given recorded stream(s). Streams without a setter fall back to the tf-based
``world <- frame_id`` lookup."""
"""Mark an ``async def`` method ``(self, msg) -> Pose | None`` as the pose
setter for the given recorded stream(s). Streams without a setter fall back
to the tf-based ``world <- frame_id`` lookup."""

def decorate(fn: Any) -> Any:
if not inspect.iscoroutinefunction(fn):
raise TypeError(
f"@pose_setter_for must decorate an `async def` method; "
f"{getattr(fn, '__qualname__', fn)} is not async"
)
fn._pose_setter_for = tuple(stream_names)
return fn

Expand All @@ -302,16 +307,20 @@ class MyRecorder(Recorder):

Each stream's pose defaults to a ``world <- frame_id`` tf lookup; decorate a
method with ``@pose_setter_for("stream")`` to source it elsewhere (e.g. from
an odometry stream)::
an odometry stream). Setters run on the module's event loop and may be
``async def``::

@pose_setter_for("lidar")
def _lidar_pose(self, msg):
async def _lidar_pose(self, msg):
return self._last_odom_pose
"""

config: RecorderConfig

_pose_setters: dict[str, Any] = {}
# Per-stream count of frames lost to the dispatcher's LATEST coalescing
# (sink slower than input). Populated lazily as drops happen.
_dropped_frames: dict[str, int] = {}

@rpc
def start(self) -> None:
Expand All @@ -324,6 +333,7 @@ def start(self) -> None:
return

self._pose_setters = self._collect_pose_setters()
self._dropped_frames = {}

# TODO: store reset API/logic is not implemented yet. This module
# shouldn't need to know about files (SqliteStore specific), and
Expand Down Expand Up @@ -368,12 +378,14 @@ def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any])
already in world coords) fall back to ``config.default_frame_id`` —
so every observation gets a robot-pose anchor when tf is publishing.

Registers the subscription as a disposable on this module.
Each port is recorded by an async callback dispatched on the module's
event loop via :meth:`process_observable`, which serialises invocations
and registers the subscription for cleanup on stop().
"""

def on_msg(msg: Any) -> None:
async def on_msg(msg: Any) -> None:
ts = self._resolve_ts(name, msg)
pose = self._resolve_pose(name, msg, ts)
pose = await self._resolve_pose(name, msg, ts)
if not pose:
logger.warning(
"[%s] No pose for time %s (msg ts: %s), storing without pose",
Expand All @@ -383,7 +395,23 @@ def on_msg(msg: Any) -> None:
)
stream.append(msg, ts=ts, pose=pose)

self.register_disposable(Disposable(input_topic.subscribe(on_msg)))
self.process_observable(
input_topic.pure_observable(), on_msg, on_drop=lambda: self._on_frame_dropped(name)
)

def _on_frame_dropped(self, name: str) -> None:
"""A frame for *name* was dropped because the sink couldn't keep up with
the input rate (dispatcher LATEST coalescing). Count it and warn — once,
then on each power-of-ten — so silent data loss is visible without
flooding the log."""
count = self._dropped_frames.get(name, 0) + 1
self._dropped_frames[name] = count
if count == 1 or count % 1000 == 0:
logger.warning(
"[%s] Recorder dropped %d frame(s) — sink slower than input; recording is lossy",
name,
count,
)

def _prepare_streams(self) -> None:
"""On APPEND, drop the streams this recorder is about to (re)write — the
Expand All @@ -401,13 +429,13 @@ def _resolve_ts(self, name: str, msg: Any) -> float:
"""Timestamp to record *msg* at. Override to re-base onto another clock."""
return getattr(msg, "ts", None) or time.time()

def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None:
"""Pose to anchor *msg* with. Dispatches to the stream's
async def _resolve_pose(self, name: str, msg: Any, ts: float) -> Pose | None:
"""Pose to anchor *msg* with. Dispatches to the stream's (async)
``@pose_setter_for`` if one is defined, else falls back to a
``world <- frame_id`` tf lookup."""
setter = self._pose_setters.get(name)
if setter is not None:
return cast("Pose | None", setter(msg))
return cast("Pose | None", await setter(msg))
frame_id = getattr(msg, "frame_id", None) or self.config.default_frame_id
transform = self.tf.get(
self.config.root_frame, frame_id, time_point=ts, time_tolerance=self.config.tf_tolerance
Expand Down
Loading
Loading