Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ recording*.db

# openspec
/openspec/changes/archive/

# OpenCode deepwork notes
.slim/deepwork/
6 changes: 3 additions & 3 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ def __init__(
from dimos.protocol.pubsub.impl.h264_lcm import H264LCM
from dimos.protocol.video.h264 import H264Config

self.config = config or H264Config()
self.h264_config = config or H264Config()
self.decode_images = decode_images
self.lcm = H264LCM(config=self.config, decode_images=decode_images, **kwargs) # type: ignore[assignment]
self.lcm = H264LCM(config=self.h264_config, decode_images=decode_images, **kwargs) # type: ignore[assignment]
super().__init__(topic, type)

def __reduce__(self): # type: ignore[no-untyped-def]
return (
H264LcmTransport,
(self.topic.topic, self.topic.lcm_type, self.config, self.decode_images),
(self.topic.topic, self.topic.lcm_type, self.h264_config, self.decode_images),
)

def start(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion dimos/experimental/security_demo/depth_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _loop(self) -> None:

def _process(self, image: Image) -> None:
rgb = image.to_rgb()
pil_image = PILImage.fromarray(rgb.require_raw("DepthEstimator._process"))
pil_image = PILImage.fromarray(rgb.data)
if pil_image.width > _DEPTH_MAX_WIDTH:
scale = _DEPTH_MAX_WIDTH / pil_image.width
new_h = int(pil_image.height * scale)
Expand Down
4 changes: 2 additions & 2 deletions dimos/experimental/security_demo/security_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _patrol_step(self) -> None:
)

annotated = draw_bounding_box(
image.require_raw("SecurityModule._detection_step").copy(),
image.data.copy(),
list(best.bbox),
label=best.name,
confidence=best.confidence,
Expand Down Expand Up @@ -340,7 +340,7 @@ def _follow_step(self) -> None:
twist = self._visual_servo.compute_twist(best.bbox, latest_image.width)
self.cmd_vel.publish(twist)

overlay = latest_image.require_raw("SecurityModule._follow_step").copy()
overlay = latest_image.data.copy()
if hasattr(best, "mask") and best.mask is not None:
mask_bool = best.mask > 0
green = np.zeros_like(overlay)
Expand Down
2 changes: 1 addition & 1 deletion dimos/mapping/occupancy/visualize_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def visualize_path(
scale: int = 8,
) -> Image:
image = visualize_occupancy_grid(occupancy_grid, "rainbow")
bgr = image.require_raw("visualize_path")
bgr = image.data

bgr = cv2.resize(
bgr,
Expand Down
2 changes: 1 addition & 1 deletion dimos/mapping/osm/current_location_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _fetch_new_map(self) -> None:

assert self._map_image is not None
assert self._position is not None
map_data = self._map_image.image.require_raw("CurrentLocationMap._fetch_new_map")
map_data = self._map_image.image.data
pil_image = PILImage.fromarray(map_data)
draw = ImageDraw.Draw(pil_image)
x, y = self._map_image.latlon_to_pixel(self._position)
Expand Down
10 changes: 8 additions & 2 deletions dimos/memory2/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class RecorderConfig(MemoryModuleConfig):
default_frame_id: str = "base_link"
tf_tolerance: float = 0.5
db_path: str | Path = "recording.db"
stream_codecs: dict[str, str] = Field(default_factory=dict)


class Recorder(MemoryModule):
Expand Down Expand Up @@ -308,9 +309,14 @@ def start(self) -> None:
return

for name, port in self.inputs.items():
stream: Stream[Any] = self.store.stream(name, port.type)
codec = self.config.stream_codecs.get(name)
overrides = {"codec": codec} if codec is not None else {}
stream: Stream[Any] = self.store.stream(name, port.type, **overrides)
self._port_to_stream(name, port, stream)
logger.info("Recording %s (%s)", name, port.type.__name__)
if codec is not None:
logger.info("Recording %s (%s, codec=%s)", name, port.type.__name__, codec)
else:
logger.info("Recording %s (%s)", name, port.type.__name__)

def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any]) -> None:
"""Append each message from *input_topic* to *stream*, attaching world pose via tf.
Expand Down
16 changes: 16 additions & 0 deletions dimos/memory2/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ def _create_backend(
if notifier is None or isinstance(notifier, type):
notifier = (notifier or SubjectNotifier)()

from dimos.memory2.video.h264 import H264ImageBackend, is_h264_backend_marker
from dimos.msgs.sensor_msgs.Image import Image

if is_h264_backend_marker(codec):
if payload_type is None or not issubclass(payload_type, Image):
raise TypeError("codec='h264' is only supported for Image streams")
if bs is None:
raise RuntimeError("codec='h264' requires a BlobStore")
return H264ImageBackend(
metadata_store=obs,
blob_store=bs,
vector_store=vs,
notifier=notifier,
h264_config=config.get("h264_config"),
)

return Backend(
metadata_store=obs,
codec=codec,
Expand Down
42 changes: 40 additions & 2 deletions dimos/memory2/store/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

from dataclasses import asdict
import os
import sqlite3
from typing import Annotated, Any
Expand All @@ -31,6 +32,8 @@
from dimos.memory2.utils.validation import validate_identifier
from dimos.memory2.vectorstore.base import VectorStore
from dimos.memory2.vectorstore.sqlite import SqliteVectorStore
from dimos.msgs.sensor_msgs.Image import ImageFormat
from dimos.protocol.video.h264 import H264Config


class SqliteStoreConfig(StoreConfig):
Expand Down Expand Up @@ -66,6 +69,7 @@ def _open_connection(self) -> sqlite3.Connection:
def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]:
"""Reconstruct a Backend from a stored config dict."""
from dimos.memory2.codecs.base import _resolve_payload_type, codec_from_id
from dimos.memory2.video.h264 import H264ImageBackend, is_h264_backend_marker

payload_module = stored["payload_module"]
codec = codec_from_id(stored["codec_id"], payload_module)
Expand Down Expand Up @@ -110,9 +114,19 @@ def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]:
conn=backend_conn,
name=name,
codec=codec,
blob_store_conn_match=blob_store_conn_match and eager_blobs,
blob_store_conn_match=blob_store_conn_match
and eager_blobs
and not is_h264_backend_marker(codec),
page_size=page_size,
)
if is_h264_backend_marker(codec):
return H264ImageBackend(
metadata_store=metadata_store,
blob_store=bs,
vector_store=vs,
notifier=notifier,
h264_config=self._deserialize_h264_config(stored.get("h264_config")),
)
backend: Backend[Any] = Backend(
metadata_store=metadata_store,
codec=codec,
Expand All @@ -124,6 +138,23 @@ def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]:
)
return backend

@staticmethod
def _serialize_h264_config(config: H264Config) -> dict[str, Any]:
data = asdict(config)
data["supported_formats"] = [fmt.value for fmt in config.supported_formats]
return data

@staticmethod
def _deserialize_h264_config(data: dict[str, Any] | None) -> H264Config | None:
if data is None:
return None
config = dict(data)
if "supported_formats" in config:
config["supported_formats"] = tuple(
ImageFormat(fmt) for fmt in config["supported_formats"]
)
return H264Config(**config)

@staticmethod
def _serialize_backend(
backend: Backend[Any], payload_module: str, page_size: int
Expand All @@ -140,6 +171,11 @@ def _serialize_backend(
if backend.vector_store is not None:
cfg["vector_store"] = backend.vector_store.serialize()
cfg["notifier"] = backend.notifier.serialize()

from dimos.memory2.video.h264 import H264ImageBackend

if isinstance(backend, H264ImageBackend):
cfg["h264_config"] = SqliteStore._serialize_h264_config(backend.h264_config)
return cfg

def _create_backend(
Expand Down Expand Up @@ -176,10 +212,12 @@ def _create_backend(
codec = self._resolve_codec(payload_type, config.get("codec"))
config["codec"] = codec

from dimos.memory2.video.h264 import is_h264_backend_marker

# Create SqliteObservationStore with conn-sharing
bs = config["blob_store"]
blob_conn_match = isinstance(bs, SqliteBlobStore) and bs._conn is backend_conn
eager_blobs = config.get("eager_blobs", False)
eager_blobs = config.get("eager_blobs", False) and not is_h264_backend_marker(codec)
obs_store: SqliteObservationStore[Any] = SqliteObservationStore(
conn=backend_conn,
name=name,
Expand Down
Loading
Loading