From 2a4af383e9363602aeaa405cf9d2071de434c205 Mon Sep 17 00:00:00 2001 From: cc Date: Mon, 15 Jun 2026 20:50:46 -0700 Subject: [PATCH 1/3] feat: add opt-in h264 image storage --- .gitignore | 3 + dimos/core/transport.py | 6 +- .../security_demo/depth_estimator.py | 2 +- .../security_demo/security_module.py | 4 +- dimos/mapping/occupancy/visualize_path.py | 2 +- dimos/mapping/osm/current_location_map.py | 2 +- dimos/memory2/module.py | 10 +- dimos/memory2/store/base.py | 16 + dimos/memory2/store/sqlite.py | 42 ++- dimos/memory2/video/h264.py | 204 +++++++++++-- dimos/memory2/video/test_h264_storage.py | 209 +++++++++---- dimos/memory2/vis/utils.py | 2 +- dimos/models/vl/florence.py | 4 +- dimos/models/vl/moondream.py | 2 +- dimos/models/vl/moondream_hosted.py | 2 +- dimos/msgs/sensor_msgs/Image.py | 256 +++------------- dimos/msgs/sensor_msgs/PointCloud2.py | 4 +- dimos/perception/common/utils.py | 2 +- .../temporal_memory/clip_filter.py | 2 +- .../temporal_memory/temporal_utils/helpers.py | 4 +- dimos/perception/object_tracker_2d.py | 2 +- dimos/perception/spatial_perception.py | 2 +- dimos/protocol/pubsub/impl/h264_lcm.py | 79 ++++- dimos/protocol/pubsub/impl/test_h264_lcm.py | 164 +++++++---- dimos/protocol/video/demo_h264_video_e2e.py | 77 +---- dimos/protocol/video/h264.py | 278 +++++++++++++----- dimos/protocol/video/test_h264.py | 158 +++++++--- dimos/robot/drone/camera_module.py | 2 +- dimos/robot/drone/drone_tracking_module.py | 2 +- .../smart/unitree_go2_h264_video.py | 16 +- dimos/teleop/quest_hosted/video_track.py | 4 +- .../memory/h264_storage_benchmark_report.md | 4 +- docs/capabilities/memory/index.md | 32 +- docs/coding-agents/style.md | 14 +- docs/development/testing.md | 2 +- docs/usage/transports/index.md | 12 +- 36 files changed, 1009 insertions(+), 617 deletions(-) diff --git a/.gitignore b/.gitignore index 9064ce16ff..7945e0777c 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,6 @@ recording*.db # openspec /openspec/changes/archive/ + +# OpenCode deepwork notes +.slim/deepwork/ diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 19aedc6827..e70d9343af 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -174,15 +174,15 @@ def __init__( from dimos.protocol.pubsub.impl.h264_lcm import H264LCM from dimos.protocol.video.h264 import H264Config - self.config = config or H264Config() + self.h264_config = config or H264Config() self.decode_images = decode_images - self.lcm = H264LCM(config=self.config, decode_images=decode_images, **kwargs) # type: ignore[assignment] + self.lcm = H264LCM(config=self.h264_config, decode_images=decode_images, **kwargs) # type: ignore[assignment] super().__init__(topic, type) def __reduce__(self): # type: ignore[no-untyped-def] return ( H264LcmTransport, - (self.topic.topic, self.topic.lcm_type, self.config, self.decode_images), + (self.topic.topic, self.topic.lcm_type, self.h264_config, self.decode_images), ) def start(self) -> None: diff --git a/dimos/experimental/security_demo/depth_estimator.py b/dimos/experimental/security_demo/depth_estimator.py index 55abb4b6a8..5737d3f006 100644 --- a/dimos/experimental/security_demo/depth_estimator.py +++ b/dimos/experimental/security_demo/depth_estimator.py @@ -84,7 +84,7 @@ def _loop(self) -> None: def _process(self, image: Image) -> None: rgb = image.to_rgb() - pil_image = PILImage.fromarray(rgb.require_raw("DepthEstimator._process")) + pil_image = PILImage.fromarray(rgb.data) if pil_image.width > _DEPTH_MAX_WIDTH: scale = _DEPTH_MAX_WIDTH / pil_image.width new_h = int(pil_image.height * scale) diff --git a/dimos/experimental/security_demo/security_module.py b/dimos/experimental/security_demo/security_module.py index dbc9c2dc5c..9569227805 100644 --- a/dimos/experimental/security_demo/security_module.py +++ b/dimos/experimental/security_demo/security_module.py @@ -299,7 +299,7 @@ def _patrol_step(self) -> None: ) annotated = draw_bounding_box( - image.require_raw("SecurityModule._detection_step").copy(), + image.data.copy(), list(best.bbox), label=best.name, confidence=best.confidence, @@ -340,7 +340,7 @@ def _follow_step(self) -> None: twist = self._visual_servo.compute_twist(best.bbox, latest_image.width) self.cmd_vel.publish(twist) - overlay = latest_image.require_raw("SecurityModule._follow_step").copy() + overlay = latest_image.data.copy() if hasattr(best, "mask") and best.mask is not None: mask_bool = best.mask > 0 green = np.zeros_like(overlay) diff --git a/dimos/mapping/occupancy/visualize_path.py b/dimos/mapping/occupancy/visualize_path.py index 41b19e5686..89dcf83067 100644 --- a/dimos/mapping/occupancy/visualize_path.py +++ b/dimos/mapping/occupancy/visualize_path.py @@ -30,7 +30,7 @@ def visualize_path( scale: int = 8, ) -> Image: image = visualize_occupancy_grid(occupancy_grid, "rainbow") - bgr = image.require_raw("visualize_path") + bgr = image.data bgr = cv2.resize( bgr, diff --git a/dimos/mapping/osm/current_location_map.py b/dimos/mapping/osm/current_location_map.py index d723e1e2ff..308d29e359 100644 --- a/dimos/mapping/osm/current_location_map.py +++ b/dimos/mapping/osm/current_location_map.py @@ -74,7 +74,7 @@ def _fetch_new_map(self) -> None: assert self._map_image is not None assert self._position is not None - map_data = self._map_image.image.require_raw("CurrentLocationMap._fetch_new_map") + map_data = self._map_image.image.data pil_image = PILImage.fromarray(map_data) draw = ImageDraw.Draw(pil_image) x, y = self._map_image.latlon_to_pixel(self._position) diff --git a/dimos/memory2/module.py b/dimos/memory2/module.py index ac4bd35f24..8e3bfddf77 100644 --- a/dimos/memory2/module.py +++ b/dimos/memory2/module.py @@ -259,6 +259,7 @@ class RecorderConfig(MemoryModuleConfig): default_frame_id: str = "base_link" tf_tolerance: float = 0.5 db_path: str | Path = "recording.db" + stream_codecs: dict[str, str] = Field(default_factory=dict) class Recorder(MemoryModule): @@ -308,9 +309,14 @@ def start(self) -> None: return for name, port in self.inputs.items(): - stream: Stream[Any] = self.store.stream(name, port.type) + codec = self.config.stream_codecs.get(name) + overrides = {"codec": codec} if codec is not None else {} + stream: Stream[Any] = self.store.stream(name, port.type, **overrides) self._port_to_stream(name, port, stream) - logger.info("Recording %s (%s)", name, port.type.__name__) + if codec is not None: + logger.info("Recording %s (%s, codec=%s)", name, port.type.__name__, codec) + else: + logger.info("Recording %s (%s)", name, port.type.__name__) def _port_to_stream(self, name: str, input_topic: In[Any], stream: Stream[Any]) -> None: """Append each message from *input_topic* to *stream*, attaching world pose via tf. diff --git a/dimos/memory2/store/base.py b/dimos/memory2/store/base.py index 7a7162a6d1..5d29f668e6 100644 --- a/dimos/memory2/store/base.py +++ b/dimos/memory2/store/base.py @@ -176,6 +176,22 @@ def _create_backend( if notifier is None or isinstance(notifier, type): notifier = (notifier or SubjectNotifier)() + from dimos.memory2.video.h264 import H264ImageBackend, is_h264_backend_marker + from dimos.msgs.sensor_msgs.Image import Image + + if is_h264_backend_marker(codec): + if payload_type is None or not issubclass(payload_type, Image): + raise TypeError("codec='h264' is only supported for Image streams") + if bs is None: + raise RuntimeError("codec='h264' requires a BlobStore") + return H264ImageBackend( + metadata_store=obs, + blob_store=bs, + vector_store=vs, + notifier=notifier, + h264_config=config.get("h264_config"), + ) + return Backend( metadata_store=obs, codec=codec, diff --git a/dimos/memory2/store/sqlite.py b/dimos/memory2/store/sqlite.py index bb2b735c1c..1b668c8922 100644 --- a/dimos/memory2/store/sqlite.py +++ b/dimos/memory2/store/sqlite.py @@ -14,6 +14,7 @@ from __future__ import annotations +from dataclasses import asdict import os import sqlite3 from typing import Annotated, Any @@ -31,6 +32,8 @@ from dimos.memory2.utils.validation import validate_identifier from dimos.memory2.vectorstore.base import VectorStore from dimos.memory2.vectorstore.sqlite import SqliteVectorStore +from dimos.msgs.sensor_msgs.Image import ImageFormat +from dimos.protocol.video.h264 import H264Config class SqliteStoreConfig(StoreConfig): @@ -66,6 +69,7 @@ def _open_connection(self) -> sqlite3.Connection: def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]: """Reconstruct a Backend from a stored config dict.""" from dimos.memory2.codecs.base import _resolve_payload_type, codec_from_id + from dimos.memory2.video.h264 import H264ImageBackend, is_h264_backend_marker payload_module = stored["payload_module"] codec = codec_from_id(stored["codec_id"], payload_module) @@ -110,9 +114,19 @@ def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]: conn=backend_conn, name=name, codec=codec, - blob_store_conn_match=blob_store_conn_match and eager_blobs, + blob_store_conn_match=blob_store_conn_match + and eager_blobs + and not is_h264_backend_marker(codec), page_size=page_size, ) + if is_h264_backend_marker(codec): + return H264ImageBackend( + metadata_store=metadata_store, + blob_store=bs, + vector_store=vs, + notifier=notifier, + h264_config=self._deserialize_h264_config(stored.get("h264_config")), + ) backend: Backend[Any] = Backend( metadata_store=metadata_store, codec=codec, @@ -124,6 +138,23 @@ def _assemble_backend(self, name: str, stored: dict[str, Any]) -> Backend[Any]: ) return backend + @staticmethod + def _serialize_h264_config(config: H264Config) -> dict[str, Any]: + data = asdict(config) + data["supported_formats"] = [fmt.value for fmt in config.supported_formats] + return data + + @staticmethod + def _deserialize_h264_config(data: dict[str, Any] | None) -> H264Config | None: + if data is None: + return None + config = dict(data) + if "supported_formats" in config: + config["supported_formats"] = tuple( + ImageFormat(fmt) for fmt in config["supported_formats"] + ) + return H264Config(**config) + @staticmethod def _serialize_backend( backend: Backend[Any], payload_module: str, page_size: int @@ -140,6 +171,11 @@ def _serialize_backend( if backend.vector_store is not None: cfg["vector_store"] = backend.vector_store.serialize() cfg["notifier"] = backend.notifier.serialize() + + from dimos.memory2.video.h264 import H264ImageBackend + + if isinstance(backend, H264ImageBackend): + cfg["h264_config"] = SqliteStore._serialize_h264_config(backend.h264_config) return cfg def _create_backend( @@ -176,10 +212,12 @@ def _create_backend( codec = self._resolve_codec(payload_type, config.get("codec")) config["codec"] = codec + from dimos.memory2.video.h264 import is_h264_backend_marker + # Create SqliteObservationStore with conn-sharing bs = config["blob_store"] blob_conn_match = isinstance(bs, SqliteBlobStore) and bs._conn is backend_conn - eager_blobs = config.get("eager_blobs", False) + eager_blobs = config.get("eager_blobs", False) and not is_h264_backend_marker(codec) obs_store: SqliteObservationStore[Any] = SqliteObservationStore( conn=backend_conn, name=name, diff --git a/dimos/memory2/video/h264.py b/dimos/memory2/video/h264.py index d833cebc2c..ec4f3abba8 100644 --- a/dimos/memory2/video/h264.py +++ b/dimos/memory2/video/h264.py @@ -14,38 +14,202 @@ from __future__ import annotations -from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image +from collections.abc import Iterator +from dataclasses import replace +import threading +from typing import Any + +from dimos.memory2.backend import Backend +from dimos.memory2.type.filter import StreamQuery +from dimos.memory2.type.observation import _UNLOADED, Observation +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.video.h264 import H264Config, H264Decoder, H264Encoder, H264Packet + +_TAG_CODEC = "_dimos_codec" +_TAG_IS_KEYFRAME = "_h264_is_keyframe" +_TAG_SEQ = "_h264_seq" +_TAG_KEYFRAME_SEQ = "_h264_keyframe_seq" +_TAG_PTS = "_h264_pts" +_TAG_FORMAT = "_h264_format" +_TAG_WIDTH = "_h264_width" +_TAG_HEIGHT = "_h264_height" +_TAG_CHANNELS = "_h264_channels" +_TAG_DTYPE = "_h264_dtype" class H264ImageCodec: - """memory2 codec for already-H.264 encoded Image payloads. + """Marker codec id for opt-in H.264-backed logical Image streams. - This codec deliberately does not decode pixels. It persists an ``Image`` whose - ``encoding`` is ``"h264"`` and restores the same encoded image on read. A - separate H.264 decode session turns the encoded stream back into raw Images - for visualization or module consumption. + Normal memory2 codecs are stateless per-observation codecs. H.264 is a + stateful stream codec, so actual encoding/decoding is implemented by + :class:`H264ImageBackend`. This marker exists so stream registry persistence + can round-trip ``codec_id == "h264"``. """ CODEC_ID = "h264" def encode(self, value: Image) -> bytes: - if value.encoding != H264_IMAGE_ENCODING: - raise ValueError( - f"H264ImageCodec stores encoded Images; got encoding={value.encoding!r}" - ) - return value.lcm_encode() + raise RuntimeError("H264ImageCodec must be used through H264ImageBackend") def decode(self, data: bytes) -> Image: - image = Image.lcm_decode(data) - if image.encoding != H264_IMAGE_ENCODING: - raise ValueError( - f"H264ImageCodec expected encoded Image; got encoding={image.encoding!r}" - ) - return image + raise RuntimeError("H264ImageCodec must be used through H264ImageBackend") + + +class H264ImageBackend(Backend[Image]): + """Backend for logical ``Image`` streams physically stored as H.264 packets.""" + + def __init__(self, *, h264_config: H264Config | None = None, **kwargs: Any) -> None: + kwargs.pop("codec", None) + kwargs.pop("data_type", None) + kwargs.pop("eager_blobs", None) + super().__init__(codec=H264ImageCodec(), data_type=Image, eager_blobs=False, **kwargs) + if self.blob_store is None: + raise RuntimeError("BlobStore required for H.264 image storage") + self.h264_config = h264_config or H264Config() + self._append_lock = threading.Lock() + self._encoder: H264Encoder | None = None + self._force_next_keyframe = True + + def _new_encoder(self) -> H264Encoder: + self._encoder = H264Encoder(self.h264_config) + self._force_next_keyframe = True + return self._encoder + + def _packet_tags(self, packet: H264Packet) -> dict[str, Any]: + return { + _TAG_CODEC: "h264", + _TAG_IS_KEYFRAME: packet.is_keyframe, + _TAG_SEQ: packet.seq, + _TAG_KEYFRAME_SEQ: packet.keyframe_seq, + _TAG_PTS: packet.pts, + _TAG_FORMAT: packet.format.value, + _TAG_WIDTH: packet.width, + _TAG_HEIGHT: packet.height, + _TAG_CHANNELS: packet.channels, + _TAG_DTYPE: packet.dtype, + } + + def append(self, obs: Observation[Image]) -> Observation[Image]: + payload = obs.data + if not isinstance(payload, Image): + raise TypeError(f"Stream expects Image, got {type(payload).__qualname__}") + obs.data_type = Image + + with self._append_lock: + encoder = self._encoder or self._new_encoder() + try: + packet = encoder.encode(payload, force_keyframe=self._force_next_keyframe) + self._force_next_keyframe = False + encoded = packet.to_bytes() + obs.tags = {**obs.tags, **self._packet_tags(packet)} + + row_id = self.metadata_store.insert(obs) + obs.id = row_id + + if self.blob_store is None: + raise RuntimeError("BlobStore required for H.264 image storage") + self.blob_store.put(self.name, row_id, encoded) + obs._data = _UNLOADED + obs._loader = self._make_loader(row_id) + + if self.vector_store is not None: + emb = getattr(obs, "embedding", None) + if emb is not None: + self.vector_store.put(self.name, row_id, emb) + + if hasattr(self.metadata_store, "commit"): + self.metadata_store.commit() + except BaseException: + self._encoder = None + self._force_next_keyframe = True + if hasattr(self.metadata_store, "rollback"): + self.metadata_store.rollback() + raise + + self.notifier.notify(obs) + return obs + + def _make_loader(self, row_id: int) -> Any: + def loader() -> Image: + return self._decode_at(row_id) + + return loader + + def _metadata_rows_by_id(self) -> list[Observation[Image]]: + rows = list(self.metadata_store.query(StreamQuery(order_field="id"))) + for obs in rows: + obs.data_type = Image + return rows + + def _decode_chain_rows(self, target_id: int) -> list[Observation[Image]]: + rows = [obs for obs in self._metadata_rows_by_id() if obs.id <= target_id] + keyframes = [obs for obs in rows if obs.tags.get(_TAG_IS_KEYFRAME) is True] + if not keyframes: + raise RuntimeError(f"No H.264 keyframe available before observation id={target_id}") + start_id = keyframes[-1].id + chain = [obs for obs in rows if start_id <= obs.id <= target_id] + if not chain or chain[-1].id != target_id: + raise KeyError(f"No H.264 observation id={target_id}") + return chain + + def _decode_at(self, target_id: int) -> Image: + if self.blob_store is None: + raise RuntimeError("BlobStore required for H.264 image storage") + decoder = H264Decoder(self.h264_config) + decoded: Image | None = None + for obs in self._decode_chain_rows(target_id): + packet = H264Packet.from_bytes(self.blob_store.get(self.name, obs.id)) + decoded = decoder.decode(packet) + if obs.id == target_id: + return decoded + raise KeyError(f"No H.264 observation id={target_id}") + + def _decode_contiguous_id_order( + self, rows: list[Observation[Image]] + ) -> Iterator[Observation[Image]]: + if self.blob_store is None: + raise RuntimeError("BlobStore required for H.264 image storage") + decoder = H264Decoder(self.h264_config) + expected_id: int | None = None + for obs in rows: + if expected_id is not None and obs.id != expected_id: + # Fall back to correct random keyframe seek when a query skips ids. + obs._data = self._decode_at(obs.id) + obs._loader = None + expected_id = obs.id + 1 + yield obs + continue + packet = H264Packet.from_bytes(self.blob_store.get(self.name, obs.id)) + obs._data = decoder.decode(packet) + obs._loader = None + obs.data_type = Image + expected_id = obs.id + 1 + yield obs + + def _iterate_snapshot(self, query: StreamQuery) -> Iterator[Observation[Image]]: + # Common efficient path: whole-stream or id-ordered contiguous iteration. + can_decode_sequentially = ( + query.search_vec is None + and not query.filters + and query.order_field in (None, "id") + and not query.order_desc + and query.offset_val in (None, 0) + ) + if can_decode_sequentially: + id_query = replace(query, order_field="id", order_desc=False) + rows = list(self.metadata_store.query(id_query)) + yield from self._decode_contiguous_id_order(rows) + return + + # Arbitrary filtered/ts-ordered/vector queries stay correct by using the + # base backend path. It preserves vector search and SQLite Python + # post-filters, while this class's _make_loader still provides H.264 + # keyframe-seeking lazy reads. + yield from super()._iterate_snapshot(query) -def is_h264_image(image: Image) -> bool: - return image.encoding == H264_IMAGE_ENCODING +def is_h264_backend_marker(codec: Any) -> bool: + return getattr(codec, "CODEC_ID", None) == "h264" -__all__ = ["H264ImageCodec", "is_h264_image"] +__all__ = ["H264ImageBackend", "H264ImageCodec", "is_h264_backend_marker"] diff --git a/dimos/memory2/video/test_h264_storage.py b/dimos/memory2/video/test_h264_storage.py index 74eafb2b7a..761199dfee 100644 --- a/dimos/memory2/video/test_h264_storage.py +++ b/dimos/memory2/video/test_h264_storage.py @@ -23,13 +23,71 @@ from dimos.memory2.backend import Backend from dimos.memory2.codecs.base import codec_from_id, codec_id from dimos.memory2.codecs.jpeg import JpegCodec +from dimos.memory2.store.memory import MemoryStore from dimos.memory2.store.sqlite import SqliteStore -from dimos.memory2.video.h264 import H264ImageCodec -from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +import dimos.memory2.video.h264 as h264_storage +from dimos.memory2.video.h264 import H264ImageBackend, H264ImageCodec +from dimos.models.embedding.base import Embedding +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.protocol.video.h264 import H264Config, H264Packet _SKIP_SQLITE_VEC = platform.machine() == "aarch64" or platform.system() == "Darwin" +class FakeH264Encoder: + def __init__(self, _config: object) -> None: + self.seq = 0 + self.keyframe_seq = -1 + + def encode(self, image: Image, *, force_keyframe: bool = False) -> H264Packet: + if force_keyframe or self.seq == 0: + self.keyframe_seq = self.seq + key = True + else: + key = False + packet = H264Packet( + data=b"\x00\x00\x00\x01" + bytes([self.seq]), + format=image.format, + frame_id=image.frame_id, + ts=image.ts, + seq=self.seq, + pts=self.seq * 90, + is_keyframe=key, + keyframe_seq=self.keyframe_seq, + width=image.width, + height=image.height, + channels=image.channels, + dtype=str(image.dtype), + ) + self.seq += 1 + return packet + + +class FakeH264Decoder: + decoded_sequences: list[int] = [] + + def __init__(self, _config: object) -> None: + pass + + def decode(self, packet: H264Packet) -> Image: + self.decoded_sequences.append(packet.seq) + return Image( + data=np.full( + (packet.height, packet.width, packet.channels), packet.seq, dtype=np.uint8 + ), + format=packet.format, + frame_id=packet.frame_id, + ts=packet.ts, + ) + + +@pytest.fixture(autouse=True) +def fake_h264_codec(monkeypatch: pytest.MonkeyPatch) -> None: + FakeH264Decoder.decoded_sequences = [] + monkeypatch.setattr(h264_storage, "H264Encoder", FakeH264Encoder) + monkeypatch.setattr(h264_storage, "H264Decoder", FakeH264Decoder) + + def _raw_image(seq: int, fmt: ImageFormat = ImageFormat.RGB) -> Image: data = np.full((2, 2, 3), seq, dtype=np.uint8) if fmt == ImageFormat.GRAY: @@ -37,46 +95,19 @@ def _raw_image(seq: int, fmt: ImageFormat = ImageFormat.RGB) -> Image: return Image.from_numpy(data, format=fmt, frame_id="cam", ts=float(seq)) -def _encoded_image(seq: int, *, key: bool = True) -> Image: - return Image.encoded( - data=b"\x00\x00\x00\x01\x65" + bytes([seq]), - encoding=H264_IMAGE_ENCODING, - format=ImageFormat.RGB, - frame_id="cam", - ts=float(seq), - codec_metadata={ - "seq": seq, - "codec": "h264", - "bitstream": "annex_b", - "is_keyframe": key, - "keyframe_seq": seq if key else 0, - "pts": seq * 90, - "width": 2, - "height": 2, - "channels": 3, - "dtype": "uint8", - }, - ) - - -def test_h264_image_codec_roundtrips_encoded_image() -> None: - codec = H264ImageCodec() - image = _encoded_image(1) - - decoded = codec.decode(codec.encode(image)) - - assert decoded == image - assert decoded.encoding == H264_IMAGE_ENCODING - assert decoded.codec_metadata["seq"] == 1 - assert decoded.width == 2 - assert decoded.height == 2 +def _emb(vec: list[float]) -> Embedding: + vector = np.array(vec, dtype=np.float32) + vector /= np.linalg.norm(vector) + 1e-10 + return Embedding(vector=vector) -def test_h264_image_codec_rejects_raw_images() -> None: +def test_h264_image_codec_is_marker_only() -> None: codec = H264ImageCodec() - with pytest.raises(ValueError, match="encoded Images"): + with pytest.raises(RuntimeError, match="H264ImageBackend"): codec.encode(_raw_image(1)) + with pytest.raises(RuntimeError, match="H264ImageBackend"): + codec.decode(b"packet") def test_codec_id_and_factory_support_h264_for_image() -> None: @@ -86,36 +117,109 @@ def test_codec_id_and_factory_support_h264_for_image() -> None: assert isinstance(codec_from_id("h264", "dimos.msgs.sensor_msgs.Image.Image"), H264ImageCodec) -def test_h264_stream_stores_encoded_images_with_normal_backend(tmp_path: Path) -> None: +def test_h264_stream_stores_raw_images_and_reads_decoded_images(tmp_path: Path) -> None: if _SKIP_SQLITE_VEC: pytest.skip("sqlite-vec extension not loadable here") db = tmp_path / "h264.db" with SqliteStore(path=str(db)) as store: stream = store.stream("cam", Image, codec="h264") - stored = stream.append(_encoded_image(1), ts=1.0) - assert stored.data.encoding == H264_IMAGE_ENCODING - assert stored.data.codec_metadata["seq"] == 1 + source = stream._source + assert isinstance(source, H264ImageBackend) + stored = stream.append(_raw_image(0), ts=1.0) + assert stored.data.frame_id == "cam" + assert int(stored.data.data[0, 0, 0]) == 0 with SqliteStore(path=str(db), must_exist=True) as reopened: stream = reopened.stream("cam", Image) obs = stream.first() - assert obs.data.encoding == H264_IMAGE_ENCODING - assert obs.data.codec_metadata["seq"] == 1 + assert obs.data.frame_id == "cam" assert obs.data.width == 2 + assert int(obs.data.data[0, 0, 0]) == 0 + + +def test_h264_random_lazy_read_seeks_from_previous_keyframe(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + db = tmp_path / "random.db" + with SqliteStore(path=str(db)) as store: + stream = store.stream("cam", Image, codec="h264") + stream.append(_raw_image(0), ts=0.0) + stream.append(_raw_image(1), ts=1.0) + stream.append(_raw_image(2), ts=2.0) + + with SqliteStore(path=str(db), must_exist=True) as reopened: + obs = reopened.stream("cam", Image).order_by("ts", desc=True).to_list()[0] + assert int(obs.data.data[0, 0, 0]) == 2 + assert FakeH264Decoder.decoded_sequences[-3:] == [0, 1, 2] + + +def test_h264_filter_predicates_run_after_lazy_decode(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + db = tmp_path / "filters.db" + with SqliteStore(path=str(db)) as store: + stream = store.stream("cam", Image, codec="h264") + stream.append(_raw_image(0), ts=0.0) + stream.append(_raw_image(1), ts=1.0) + stream.append(_raw_image(2), ts=2.0) + + matches = stream.filter(lambda obs: int(obs.data.data[0, 0, 0]) == 2).to_list() + + assert len(matches) == 1 + assert int(matches[0].data.data[0, 0, 0]) == 2 -def test_h264_replay_emits_encoded_images(tmp_path: Path) -> None: +def test_h264_vector_search_uses_vector_store(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + db = tmp_path / "vector.db" + with SqliteStore(path=str(db)) as store: + stream = store.stream("cam", Image, codec="h264") + stream.append(_raw_image(0), ts=0.0, embedding=_emb([0, 1, 0])) + stream.append(_raw_image(1), ts=1.0, embedding=_emb([1, 0, 0])) + + results = stream.search(_emb([1, 0, 0]), k=1).to_list() + + assert len(results) == 1 + assert int(results[0].data.data[0, 0, 0]) == 1 + assert results[0].similarity is not None + + +def test_h264_replay_emits_decoded_images(tmp_path: Path) -> None: if _SKIP_SQLITE_VEC: pytest.skip("sqlite-vec extension not loadable here") store = SqliteStore(path=str(tmp_path / "replay.db")) stream = store.stream("cam", Image, codec="h264") - stream.append(_encoded_image(1), ts=1.0) - stream.append(_encoded_image(2, key=False), ts=2.0) + stream.append(_raw_image(0), ts=1.0) + stream.append(_raw_image(1), ts=2.0) replayed = list(store.replay().streams.cam.iterate()) - assert [image.encoding for image in replayed] == [H264_IMAGE_ENCODING, H264_IMAGE_ENCODING] - assert [image.codec_metadata["seq"] for image in replayed] == [1, 2] + assert [int(image.data[0, 0, 0]) for image in replayed] == [0, 1] + assert all(isinstance(image.data, np.ndarray) for image in replayed) + + +def test_h264_sqlite_registry_persists_config(tmp_path: Path) -> None: + if _SKIP_SQLITE_VEC: + pytest.skip("sqlite-vec extension not loadable here") + db = tmp_path / "config.db" + config = H264Config(bitrate=123_456, keyframe_interval=7, max_gop_frames=9) + with SqliteStore(path=str(db)) as store: + stream = store.stream("cam", Image, codec="h264", h264_config=config) + assert isinstance(stream._source, H264ImageBackend) + assert stream._source.h264_config.bitrate == 123_456 + + with SqliteStore(path=str(db), must_exist=True) as reopened: + stream = reopened.stream("cam", Image) + assert isinstance(stream._source, H264ImageBackend) + assert stream._source.h264_config.bitrate == 123_456 + assert stream._source.h264_config.keyframe_interval == 7 + assert stream._source.h264_config.max_gop_frames == 9 + + +def test_memory_store_rejects_h264_without_blob_store() -> None: + with pytest.raises(RuntimeError, match="BlobStore"): + MemoryStore().stream("cam", Image, codec="h264") def test_default_image_stream_still_uses_jpeg_codec(tmp_path: Path) -> None: @@ -127,12 +231,3 @@ def test_default_image_stream_still_uses_jpeg_codec(tmp_path: Path) -> None: source = stream._source assert isinstance(source, Backend) assert isinstance(source.codec, JpegCodec) - - -def test_encoded_images_reject_pixel_operations() -> None: - image = _encoded_image(1) - - with pytest.raises(ValueError, match="requires raw Image data"): - image.to_rgb() - with pytest.raises(ValueError, match="requires raw Image data"): - image.as_numpy() diff --git a/dimos/memory2/vis/utils.py b/dimos/memory2/vis/utils.py index ff8f90fbed..fee6f66057 100644 --- a/dimos/memory2/vis/utils.py +++ b/dimos/memory2/vis/utils.py @@ -65,7 +65,7 @@ def mosaic( canvas = np.zeros((rows * cell_height, cols * cell_w, 3), dtype=np.uint8) for i, img in enumerate(images): r, c = divmod(i, cols) - tile = cv2.resize(img.to_bgr().require_raw("mosaic_observations"), (cell_w, cell_height)) + tile = cv2.resize(img.to_bgr().data, (cell_w, cell_height)) canvas[r * cell_height : (r + 1) * cell_height, c * cell_w : (c + 1) * cell_w] = tile result = Image(data=canvas, format=ImageFormat.BGR) diff --git a/dimos/models/vl/florence.py b/dimos/models/vl/florence.py index b1aa56bc16..8adda7a7af 100644 --- a/dimos/models/vl/florence.py +++ b/dimos/models/vl/florence.py @@ -98,7 +98,7 @@ def caption(self, image: Image, detail: str | CaptionDetail | None = None) -> st task_prompt = CaptionDetail.from_str(detail).value # Convert to PIL - pil_image = PILImage.fromarray(image.to_rgb().require_raw("Florence2Model.caption")) + pil_image = PILImage.fromarray(image.to_rgb().data) # Process inputs inputs = self._processor(text=task_prompt, images=pil_image, return_tensors="pt") @@ -138,7 +138,7 @@ def caption_batch(self, *images: Image) -> list[str]: # Convert all to PIL pil_images = [ - PILImage.fromarray(img.to_rgb().require_raw("Florence2Model.caption_batch")) + PILImage.fromarray(img.to_rgb().data) for img in images ] diff --git a/dimos/models/vl/moondream.py b/dimos/models/vl/moondream.py index b7c1a0cc25..e3cfe744ce 100644 --- a/dimos/models/vl/moondream.py +++ b/dimos/models/vl/moondream.py @@ -67,7 +67,7 @@ def _to_pil(self, image: Image | np.ndarray[Any, Any]) -> PILImage.Image: image, _ = self._prepare_image(image) rgb_image = image.to_rgb() - return PILImage.fromarray(rgb_image.require_raw("MoondreamVlModel._to_pil")) + return PILImage.fromarray(rgb_image.data) def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def] pil_image = self._to_pil(image) diff --git a/dimos/models/vl/moondream_hosted.py b/dimos/models/vl/moondream_hosted.py index 2a6f81977d..76e55451a1 100644 --- a/dimos/models/vl/moondream_hosted.py +++ b/dimos/models/vl/moondream_hosted.py @@ -54,7 +54,7 @@ def _to_pil_image(self, image: Image | np.ndarray) -> PILImage.Image: image = Image.from_numpy(image) rgb_image = image.to_rgb() - return PILImage.fromarray(rgb_image.require_raw("MoondreamHostedVlModel._to_pil_image")) + return PILImage.fromarray(rgb_image.data) def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: # type: ignore[no-untyped-def] pil_image = self._to_pil_image(image) diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 30c7aeb158..5eaca03886 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -17,8 +17,6 @@ import base64 from dataclasses import dataclass, field from enum import Enum -import json -import struct import time from typing import TYPE_CHECKING, Any, Literal, TypedDict import warnings @@ -84,85 +82,41 @@ class AgentImageMessage(TypedDict): data: str # Base64 encoded image data -RAW_IMAGE_ENCODING = "raw" -H264_IMAGE_ENCODING = "h264" -_ENCODED_IMAGE_MAGIC = b"DIMI1" - - -def _data_equal(left: np.ndarray[Any, np.dtype[Any]] | bytes, right: object) -> bool: - if isinstance(left, np.ndarray) and isinstance(right, np.ndarray): - return bool(np.array_equal(left, right)) - if isinstance(left, bytes) and isinstance(right, bytes): - return left == right - return False - - -def _pack_encoded_image_payload(metadata: dict[str, Any], payload: bytes) -> bytes: - header = json.dumps(metadata, sort_keys=True, separators=(",", ":")).encode("utf-8") - return _ENCODED_IMAGE_MAGIC + struct.pack(">I", len(header)) + header + payload - - -def _unpack_encoded_image_payload(payload: bytes) -> tuple[dict[str, Any], bytes]: - if not payload.startswith(_ENCODED_IMAGE_MAGIC): - return {}, payload - offset = len(_ENCODED_IMAGE_MAGIC) - header_len = struct.unpack(">I", payload[offset : offset + 4])[0] - header_start = offset + 4 - header_end = header_start + header_len - metadata = json.loads(payload[header_start:header_end].decode("utf-8")) - return metadata, payload[header_end:] - - @dataclass class Image(Timestamped): - """Image container for raw pixels or explicitly encoded image payloads.""" + """Simple NumPy-based image container.""" msg_name = "sensor_msgs.Image" - data: np.ndarray[Any, np.dtype[Any]] | bytes = field( + data: np.ndarray[Any, np.dtype[Any]] = field( default_factory=lambda: np.zeros((1, 1, 3), dtype=np.uint8) ) format: ImageFormat = field(default=ImageFormat.BGR) frame_id: str = field(default="") ts: float = field(default_factory=time.time) - encoding: str = field(default=RAW_IMAGE_ENCODING) - codec_metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: - if self.encoding == RAW_IMAGE_ENCODING: - if not isinstance(self.data, np.ndarray): - self.data = np.asarray(self.data) - if not isinstance(self.data, np.ndarray): - raise TypeError("Raw Image payload must be a NumPy array") - arr: np.ndarray[Any, np.dtype[Any]] = self.data - if arr.ndim < 2: - raise ValueError("Image requires a 2D/3D NumPy array") - return - if isinstance(self.data, bytearray): - self.data = bytes(self.data) - elif not isinstance(self.data, bytes): - self.data = memoryview(np.ascontiguousarray(self.data)).tobytes() - if not self.data: - raise ValueError("Encoded Image payload cannot be empty") + if not isinstance(self.data, np.ndarray): + self.data = np.asarray(self.data) + if self.data.ndim < 2: + raise ValueError("Image requires a 2D/3D NumPy array") def __str__(self) -> str: return ( f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, " - f"encoding={self.encoding}, ts={to_human_readable(self.ts)})" + f"ts={to_human_readable(self.ts)})" ) def __repr__(self) -> str: - return f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, encoding='{self.encoding}', frame_id='{self.frame_id}', ts={self.ts})" + return f"Image(shape={self.shape}, format={self.format.value}, dtype={self.dtype}, frame_id='{self.frame_id}', ts={self.ts})" def __eq__(self, other: object) -> bool: if not isinstance(other, Image): return False return ( - _data_equal(self.data, other.data) + np.array_equal(self.data, other.data) and self.format == other.format and self.frame_id == other.frame_id - and self.encoding == other.encoding - and self.codec_metadata == other.codec_metadata and abs(self.ts - other.ts) < 1e-6 ) @@ -170,103 +124,40 @@ def __len__(self) -> int: return int(self.height * self.width) def __getstate__(self) -> dict[str, Any]: - return { - "data": self.data, - "format": self.format, - "frame_id": self.frame_id, - "ts": self.ts, - "encoding": self.encoding, - "codec_metadata": self.codec_metadata, - } + return {"data": self.data, "format": self.format, "frame_id": self.frame_id, "ts": self.ts} def __setstate__(self, state: dict[str, Any]) -> None: self.data = state.get("data", np.zeros((1, 1, 3), dtype=np.uint8)) self.format = state.get("format", ImageFormat.BGR) self.frame_id = state.get("frame_id", "") self.ts = state.get("ts", time.time()) - self.encoding = state.get("encoding", RAW_IMAGE_ENCODING) - self.codec_metadata = state.get("codec_metadata", {}) - - @property - def is_raw(self) -> bool: - return self.encoding == RAW_IMAGE_ENCODING - - @property - def is_encoded(self) -> bool: - return not self.is_raw - - def require_raw(self, operation: str = "operation") -> np.ndarray[Any, np.dtype[Any]]: - if self.encoding != RAW_IMAGE_ENCODING: - raise ValueError(f"{operation} requires raw Image data; got encoding={self.encoding!r}") - assert isinstance(self.data, np.ndarray) - return self.data @property def height(self) -> int: - if self.is_encoded: - return int(self.codec_metadata.get("height", 0)) - arr = self.require_raw("height") - return int(arr.shape[0]) + return int(self.data.shape[0]) @property def width(self) -> int: - if self.is_encoded: - return int(self.codec_metadata.get("width", 0)) - arr = self.require_raw("width") - return int(arr.shape[1]) + return int(self.data.shape[1]) @property def channels(self) -> int: - if self.is_encoded: - if "channels" in self.codec_metadata: - return int(self.codec_metadata["channels"]) - if self.format in ( - ImageFormat.GRAY, - ImageFormat.GRAY16, - ImageFormat.DEPTH, - ImageFormat.DEPTH16, - ): - return 1 - if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): - return 4 - return 3 - arr = self.require_raw("channels") - if arr.ndim == 2: + if self.data.ndim == 2: return 1 - if arr.ndim == 3: - return int(arr.shape[2]) + if self.data.ndim == 3: + return int(self.data.shape[2]) raise ValueError("Invalid image dimensions") @property def shape(self) -> tuple[int, ...]: - if self.is_encoded: - if self.channels == 1: - return (self.height, self.width) - return (self.height, self.width, self.channels) - return tuple(self.require_raw("shape").shape) + return tuple(self.data.shape) @property def dtype(self) -> np.dtype[Any]: - if self.is_encoded: - return np.dtype(str(self.codec_metadata.get("dtype", "uint8"))) - return self.require_raw("dtype").dtype + return self.data.dtype def copy(self) -> Image: - data: np.ndarray[Any, np.dtype[Any]] | bytes - if self.is_encoded: - if not isinstance(self.data, bytes): - raise ValueError("Encoded Image payload must be bytes") - data = bytes(self.data) - else: - data = self.require_raw("copy").copy() - return Image( - data=data, - format=self.format, - frame_id=self.frame_id, - ts=self.ts, - encoding=self.encoding, - codec_metadata=dict(self.codec_metadata), - ) + return Image(data=self.data.copy(), format=self.format, frame_id=self.frame_id, ts=self.ts) @classmethod def from_numpy( @@ -283,27 +174,6 @@ def from_numpy( ts=ts if ts is not None else time.time(), ) - @classmethod - def encoded( - cls, - *, - data: bytes, - encoding: str, - format: ImageFormat, - frame_id: str = "", - ts: float | None = None, - codec_metadata: dict[str, Any] | None = None, - ) -> Image: - metadata = dict(codec_metadata or {}) - return cls( - data=data, - format=format, - frame_id=frame_id, - ts=ts if ts is not None else time.time(), - encoding=encoding, - codec_metadata=metadata, - ) - @classmethod def from_file( cls, @@ -341,7 +211,7 @@ def from_opencv( def to_opencv(self) -> np.ndarray: """Convert to OpenCV BGR format.""" - arr = self.require_raw("to_opencv") + arr = self.data if self.format == ImageFormat.BGR: return arr if self.format == ImageFormat.RGB: @@ -361,12 +231,12 @@ def to_opencv(self) -> np.ndarray: def as_numpy(self) -> np.ndarray: """Get image data as numpy array.""" - return self.require_raw("as_numpy") + return self.data def to_rgb(self) -> Image: - arr = self.require_raw("to_rgb") if self.format == ImageFormat.RGB: return self.copy() + arr = self.data if self.format == ImageFormat.BGR: return Image( data=cv2.cvtColor(arr, cv2.COLOR_BGR2RGB), @@ -397,9 +267,9 @@ def to_rgb(self) -> Image: return self.copy() def to_bgr(self) -> Image: - arr = self.require_raw("to_bgr") if self.format == ImageFormat.BGR: return self.copy() + arr = self.data if self.format == ImageFormat.RGB: return Image( data=cv2.cvtColor(arr, cv2.COLOR_RGB2BGR), @@ -434,19 +304,18 @@ def to_bgr(self) -> Image: return self.copy() def to_grayscale(self) -> Image: - arr = self.require_raw("to_grayscale") if self.format in (ImageFormat.GRAY, ImageFormat.GRAY16, ImageFormat.DEPTH): return self.copy() if self.format == ImageFormat.BGR: return Image( - data=cv2.cvtColor(arr, cv2.COLOR_BGR2GRAY), + data=cv2.cvtColor(self.data, cv2.COLOR_BGR2GRAY), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, ) if self.format == ImageFormat.RGB: return Image( - data=cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY), + data=cv2.cvtColor(self.data, cv2.COLOR_RGB2GRAY), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, @@ -454,7 +323,7 @@ def to_grayscale(self) -> Image: if self.format in (ImageFormat.RGBA, ImageFormat.BGRA): code = cv2.COLOR_RGBA2GRAY if self.format == ImageFormat.RGBA else cv2.COLOR_BGRA2GRAY return Image( - data=cv2.cvtColor(arr, code), + data=cv2.cvtColor(self.data, code), format=ImageFormat.GRAY, frame_id=self.frame_id, ts=self.ts, @@ -463,13 +332,11 @@ def to_grayscale(self) -> Image: def to_rerun(self) -> Any: """Convert to rerun Image format.""" - return _format_to_rerun(self.require_raw("to_rerun"), self.format) + return _format_to_rerun(self.data, self.format) def resize(self, width: int, height: int, interpolation: int = cv2.INTER_LINEAR) -> Image: return Image( - data=cv2.resize( - self.require_raw("resize"), (width, height), interpolation=interpolation - ), + data=cv2.resize(self.data, (width, height), interpolation=interpolation), format=self.format, frame_id=self.frame_id, ts=self.ts, @@ -505,8 +372,7 @@ def crop(self, x: int, y: int, width: int, height: int) -> Image: Returns: A new Image containing the cropped region """ - arr = self.require_raw("crop") - img_height, img_width = arr.shape[:2] + img_height, img_width = self.data.shape[:2] # Clamp the crop region to image bounds x = max(0, min(x, img_width)) @@ -515,10 +381,10 @@ def crop(self, x: int, y: int, width: int, height: int) -> Image: y_end = min(y + height, img_height) # Perform the crop using array slicing - if arr.ndim == 2: - cropped_data = arr[y:y_end, x:x_end] + if self.data.ndim == 2: + cropped_data = self.data[y:y_end, x:x_end] else: - cropped_data = arr[y:y_end, x:x_end, :] + cropped_data = self.data[y:y_end, x:x_end, :] return Image(data=cropped_data, format=self.format, frame_id=self.frame_id, ts=self.ts) @@ -530,9 +396,8 @@ def brightness(self) -> float: reading every pixel, and the mean converges quickly (CLT). """ max_val = 65535.0 if self.format in (ImageFormat.GRAY16, ImageFormat.DEPTH16) else 255.0 - arr = self.require_raw("brightness") - step = max(1, max(arr.shape[:2]) // 256) - return float(arr[::step, ::step].mean() / max_val) + step = max(1, max(self.data.shape[:2]) // 256) + return float(self.data[::step, ::step].mean() / max_val) @property def sharpness(self) -> float: @@ -541,7 +406,7 @@ def sharpness(self) -> float: Downsamples to ~160px wide before computing Laplacian variance for fast evaluation (~10-20x cheaper than full-res Sobel). """ - gray = self.to_grayscale().require_raw("sharpness") + gray = self.to_grayscale().data # Downsample to ~160px wide for cheap evaluation h, w = gray.shape[:2] if w > 160: @@ -621,31 +486,6 @@ def lcm_encode(self, frame_id: str | None = None) -> bytes: msg.header.stamp.sec = int(now) msg.header.stamp.nsec = int((now - int(now)) * 1e9) - if self.is_encoded: - if not isinstance(self.data, bytes): - raise ValueError("Encoded Image payload must be bytes") - codec_metadata = dict(self.codec_metadata) - codec_metadata.setdefault("width", self.width) - codec_metadata.setdefault("height", self.height) - codec_metadata.setdefault("channels", self.channels) - codec_metadata.setdefault("dtype", str(self.dtype)) - metadata = { - "format": self.format.value, - "encoding": self.encoding, - "codec_metadata": codec_metadata, - } - packed = _pack_encoded_image_payload(metadata, self.data) - msg.height = self.height - msg.width = self.width - msg.encoding = self.encoding - msg.is_bigendian = False - msg.step = 0 - msg.data_length = len(packed) - msg.data = packed - return msg.lcm_encode() # type: ignore[no-any-return] - - arr = self.require_raw("lcm_encode") - # Image properties msg.height = self.height msg.width = self.width @@ -653,10 +493,10 @@ def lcm_encode(self, frame_id: str | None = None) -> bytes: msg.is_bigendian = False # Calculate step (bytes per row) - channels = 1 if arr.ndim == 2 else arr.shape[2] + channels = 1 if self.data.ndim == 2 else self.data.shape[2] msg.step = self.width * self.dtype.itemsize * channels - view = memoryview(np.ascontiguousarray(arr)).cast("B") # type: ignore[arg-type] + view = memoryview(np.ascontiguousarray(self.data)).cast("B") # type: ignore[arg-type] msg.data_length = len(view) msg.data = view @@ -669,26 +509,6 @@ def lcm_decode(cls, data: bytes, **kwargs: Any) -> Image: # JPEG-compressed images use a different decode path. if msg.encoding == "jpeg": return cls.lcm_jpeg_decode(data, **kwargs) - if msg.encoding == H264_IMAGE_ENCODING: - metadata, payload = _unpack_encoded_image_payload(bytes(msg.data)) - codec_metadata = dict(metadata.get("codec_metadata", {})) - codec_metadata.setdefault("width", msg.width) - codec_metadata.setdefault("height", msg.height) - image_format = ImageFormat(metadata.get("format", ImageFormat.RGB.value)) - return cls.encoded( - data=payload, - encoding=H264_IMAGE_ENCODING, - format=image_format, - frame_id=msg.header.frame_id if hasattr(msg, "header") else "", - ts=( - msg.header.stamp.sec + msg.header.stamp.nsec / 1e9 - if hasattr(msg, "header") - and hasattr(msg.header, "stamp") - and msg.header.stamp.sec > 0 - else time.time() - ), - codec_metadata=codec_metadata, - ) fmt, dtype, channels = _parse_lcm_encoding(msg.encoding) arr: np.ndarray[Any, Any] = np.frombuffer(msg.data, dtype=dtype) @@ -722,7 +542,7 @@ def to_jpeg_bytes(self, quality: int = 75) -> bytes: jpeg = TurboJPEG() # Canonicalize to RGB so JPEG bytes are deterministic regardless of input format. - rgb_array = self.to_rgb().require_raw("to_jpeg_bytes") + rgb_array = self.to_rgb().data return jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB) # type: ignore[no-any-return] def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> bytes: @@ -800,8 +620,6 @@ def lcm_jpeg_decode(cls, data: bytes, **kwargs: Any) -> Image: __all__ = [ - "H264_IMAGE_ENCODING", - "RAW_IMAGE_ENCODING", "Image", "ImageFormat", "sharpness_barrier", diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index 46396ac15d..ae30c41711 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -223,13 +223,13 @@ def from_rgbd( PointCloud2 instance with colored points """ # Get color as RGB numpy array - color_data = color_image.to_rgb().require_raw("PointCloud2.from_rgbd color") + color_data = color_image.to_rgb().data if hasattr(color_data, "get"): # CuPy array color_data = color_data.get() color_data = np.ascontiguousarray(color_data) # Get depth numpy array - depth_data = depth_image.require_raw("PointCloud2.from_rgbd depth") + depth_data = depth_image.data if hasattr(depth_data, "get"): # CuPy array depth_data = depth_data.get() diff --git a/dimos/perception/common/utils.py b/dimos/perception/common/utils.py index f8e1a0a824..20ffc2a254 100644 --- a/dimos/perception/common/utils.py +++ b/dimos/perception/common/utils.py @@ -195,7 +195,7 @@ def rectify_image(image: Image, camera_matrix: np.ndarray, dist_coeffs: np.ndarr Returns an Image with numpy or cupy data depending on caller choice. """ - rect = cv2.undistort(image.require_raw("rectify_image"), camera_matrix, dist_coeffs) + rect = cv2.undistort(image.data, camera_matrix, dist_coeffs) return Image(data=rect, format=image.format, frame_id=image.frame_id, ts=image.ts) diff --git a/dimos/perception/experimental/temporal_memory/clip_filter.py b/dimos/perception/experimental/temporal_memory/clip_filter.py index 63cf9802ae..6ef7859e17 100644 --- a/dimos/perception/experimental/temporal_memory/clip_filter.py +++ b/dimos/perception/experimental/temporal_memory/clip_filter.py @@ -38,7 +38,7 @@ def _get_image_data(image: Image) -> np.ndarray[Any, Any]: """Extract numpy array from Image.""" if not hasattr(image, "data"): raise AttributeError(f"Image missing .data attribute: {type(image)}") - return image.require_raw("_get_image_data") + return image.data if CLIP_AVAILABLE: diff --git a/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py b/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py index d6e95e20d8..71dfc93654 100644 --- a/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py +++ b/dimos/perception/experimental/temporal_memory/temporal_utils/helpers.py @@ -70,7 +70,7 @@ def is_scene_stale(frames: list["Frame"], stale_threshold: float = 5.0) -> bool: return False if not hasattr(first_img, "data") or not hasattr(last_img, "data"): return False - first_data = first_img.require_raw("is_scene_stale first frame") - last_data = last_img.require_raw("is_scene_stale last frame") + first_data = first_img.data + last_data = last_img.data diff = np.abs(first_data.astype(float) - last_data.astype(float)) return bool(diff.mean() < stale_threshold) diff --git a/dimos/perception/object_tracker_2d.py b/dimos/perception/object_tracker_2d.py index d527e025ce..653c519054 100644 --- a/dimos/perception/object_tracker_2d.py +++ b/dimos/perception/object_tracker_2d.py @@ -93,7 +93,7 @@ def start(self) -> None: def on_frame(frame_msg: Image) -> None: arrival_time = time.perf_counter() with self._frame_lock: - self._latest_rgb_frame = frame_msg.require_raw("ObjectTracker2D.on_frame") + self._latest_rgb_frame = frame_msg.data self._frame_arrival_time = arrival_time unsub = self.color_image.subscribe(on_frame) diff --git a/dimos/perception/spatial_perception.py b/dimos/perception/spatial_perception.py index 4b3f601a1c..4d1f1377f3 100644 --- a/dimos/perception/spatial_perception.py +++ b/dimos/perception/spatial_perception.py @@ -190,7 +190,7 @@ def start(self) -> None: def set_video(image_msg: Image) -> None: # Convert Image message to numpy array if hasattr(image_msg, "data"): - frame = image_msg.require_raw("SpatialMemory.set_video") + frame = image_msg.data frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) self._latest_video_frame = frame else: diff --git a/dimos/protocol/pubsub/impl/h264_lcm.py b/dimos/protocol/pubsub/impl/h264_lcm.py index 8784e1d93b..36e9ce28b3 100644 --- a/dimos/protocol/pubsub/impl/h264_lcm.py +++ b/dimos/protocol/pubsub/impl/h264_lcm.py @@ -12,14 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""H.264-encoded Image transport over LCM.""" +"""H.264-compressed Image transport over LCM.""" from __future__ import annotations +from collections.abc import Callable +from typing import cast + from dimos.msgs.sensor_msgs.Image import Image from dimos.protocol.pubsub.encoders import DecodingError, LCMTopicProto, PubSubEncoderMixin from dimos.protocol.pubsub.impl.lcmpubsub import LCMPubSubBase -from dimos.protocol.video.h264 import H264Config, H264Decoder, H264Encoder, VideoDecodeGapError +from dimos.protocol.video.h264 import ( + H264Config, + H264Decoder, + H264Encoder, + H264Packet, + VideoDecodeGapError, +) class H264EncoderMixin(PubSubEncoderMixin[LCMTopicProto, Image, bytes]): @@ -33,31 +42,77 @@ def __init__( **kwargs: object, ) -> None: super().__init__(**kwargs) # type: ignore[misc] + if not decode_images: + raise ValueError("H.264 transport always decodes packets back to Image") self.h264_config = config or H264Config() - self.decode_images = decode_images + # Test override hooks; real publish state is per-topic below and real + # subscribe state is closure-local per subscription. self._encoder: H264Encoder | None = None self._decoder: H264Decoder | None = None + self._encoders: dict[str, H264Encoder] = {} + self._decoders: dict[str, H264Decoder] = {} def encode(self, msg: Image, topic: LCMTopicProto) -> bytes: - if self._encoder is None: - self._encoder = H264Encoder(self.h264_config) - return self._encoder.encode(msg).lcm_encode() + encoder = self._encoder + if encoder is None: + encoder = self._encoders.get(topic.topic) + if encoder is None: + encoder = H264Encoder(self.h264_config) + self._encoders[topic.topic] = encoder + return encoder.encode(msg).to_bytes() def decode(self, msg: bytes, topic: LCMTopicProto) -> Image: + packet = self._parse_packet(msg, topic) + decoder = self._decoder + if decoder is None: + decoder = self._decoders.get(topic.topic) + if decoder is None: + decoder = H264Decoder(self.h264_config) + self._decoders[topic.topic] = decoder + return self._decode_packet(packet, decoder) + + def subscribe( + self, topic: LCMTopicProto, callback: Callable[[Image, LCMTopicProto], None] + ) -> Callable[[], None]: + """Subscribe with an independent H.264 decoder per callback/topic.""" + + decoders: dict[str, H264Decoder] = {} + + def wrapper_cb(encoded_data: bytes, callback_topic: LCMTopicProto) -> None: + try: + packet = self._parse_packet(encoded_data, callback_topic) + decoder = self._decoder + if decoder is None: + decoder = decoders.get(callback_topic.topic) + if decoder is None: + decoder = H264Decoder(self.h264_config) + decoders[callback_topic.topic] = decoder + decoded_message = self._decode_packet(packet, decoder) + except DecodingError: + return + callback(decoded_message, callback_topic) + + return cast( + "Callable[[], None]", + # Intentionally skip PubSubEncoderMixin.subscribe: H.264 decoding + # needs a fresh decoder per callback, not the shared decode() state. + super(PubSubEncoderMixin, self).subscribe(topic, wrapper_cb), # type: ignore[misc] + ) + + def _parse_packet(self, msg: bytes, topic: LCMTopicProto) -> H264Packet: if topic.topic == "LCM_SELF_TEST": raise DecodingError("Ignoring LCM_SELF_TEST topic") if topic.lcm_type is not None and not issubclass(topic.lcm_type, Image): raise DecodingError(f"H.264 LCM topic {topic.topic!r} is not typed as Image") try: - image = Image.lcm_decode(msg) + return H264Packet.from_bytes(msg) except ValueError as exc: raise DecodingError(str(exc)) from exc - if not self.decode_images: - return image - if self._decoder is None: - self._decoder = H264Decoder(self.h264_config) + + @staticmethod + def _decode_packet(packet: H264Packet, decoder: H264Decoder) -> Image: try: - return self._decoder.decode(image) + return decoder.decode(packet) except (VideoDecodeGapError, ValueError) as exc: raise DecodingError(str(exc)) from exc diff --git a/dimos/protocol/pubsub/impl/test_h264_lcm.py b/dimos/protocol/pubsub/impl/test_h264_lcm.py index 3b7987e0e4..1fbe90eb0b 100644 --- a/dimos/protocol/pubsub/impl/test_h264_lcm.py +++ b/dimos/protocol/pubsub/impl/test_h264_lcm.py @@ -16,15 +16,18 @@ from collections.abc import Callable from dataclasses import dataclass +import json +import struct import numpy as np import pytest from dimos.msgs.protocol import DimosMsg -from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.encoders import DecodingError, LCMTopicProto +from dimos.protocol.pubsub.impl import h264_lcm as h264_lcm_module from dimos.protocol.pubsub.impl.h264_lcm import H264LCM, H264EncoderMixin -from dimos.protocol.video.h264 import VideoDecodeGapError +from dimos.protocol.video.h264 import H264Packet, VideoDecodeGapError @dataclass @@ -33,31 +36,26 @@ class StubTopic: lcm_type: type[DimosMsg] | None = None -def _encoded(image: Image, *, seq: int = 0, key: bool = True) -> Image: - return Image.encoded( +def _packet(image: Image, *, seq: int = 0, key: bool = True) -> H264Packet: + return H264Packet( data=b"\x00\x00\x00\x01\x65" if key else b"\x00\x00\x00\x01\x41", - encoding=H264_IMAGE_ENCODING, format=image.format, frame_id=image.frame_id, ts=image.ts, - codec_metadata={ - "seq": seq, - "codec": "h264", - "bitstream": "annex_b", - "is_keyframe": key, - "keyframe_seq": seq if key else 0, - "pts": seq * 90, - "width": image.width, - "height": image.height, - "channels": image.channels, - "dtype": str(image.dtype), - }, + seq=seq, + is_keyframe=key, + keyframe_seq=seq if key else 0, + pts=seq * 90, + width=image.width, + height=image.height, + channels=image.channels, + dtype=str(image.dtype), ) class FakeEncoder: - def encode(self, image: Image) -> Image: - return _encoded(image) + def encode(self, image: Image) -> H264Packet: + return _packet(image) class FakeDecoder: @@ -65,16 +63,32 @@ def __init__(self, *, fail: bool = False, invalid: bool = False) -> None: self.fail = fail self.invalid = invalid - def decode(self, image: Image) -> Image: + def decode(self, packet: H264Packet) -> Image: if self.fail: raise VideoDecodeGapError("waiting for keyframe") if self.invalid: - raise ValueError("Expected H.264 encoded Image") + raise ValueError("Expected H.264 packet") return Image( - data=np.zeros((image.height, image.width, 3), dtype=np.uint8), - format=image.format, - frame_id=image.frame_id, - ts=image.ts, + data=np.zeros((packet.height, packet.width, 3), dtype=np.uint8), + format=packet.format, + frame_id=packet.frame_id, + ts=packet.ts, + ) + + +class FakeStatefulDecoder: + def __init__(self, *_: object, **__: object) -> None: + self.expected_seq: int | None = None + + def decode(self, packet: H264Packet) -> Image: + if self.expected_seq is not None and packet.seq != self.expected_seq: + raise VideoDecodeGapError("waiting for keyframe") + self.expected_seq = packet.seq + 1 + return Image( + data=np.zeros((packet.height, packet.width, 3), dtype=np.uint8), + format=packet.format, + frame_id=packet.frame_id, + ts=packet.ts, ) @@ -103,20 +117,19 @@ class InMemoryH264PubSub(H264EncoderMixin, InMemoryPubSubBase): # type: ignore[ pass -def test_h264_lcm_encodes_image_as_h264_encoded_image_bytes() -> None: +def test_h264_lcm_encodes_image_as_h264_packet_bytes() -> None: transport = H264LCM() transport._encoder = FakeEncoder() # type: ignore[assignment] image = Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") payload = transport.encode(image, StubTopic("/color", Image)) - encoded = Image.lcm_decode(payload) + packet = H264Packet.from_bytes(payload) - assert encoded.encoding == H264_IMAGE_ENCODING - assert encoded.codec_metadata["codec"] == "h264" - assert encoded.codec_metadata["bitstream"] == "annex_b" - assert encoded.width == 3 - assert encoded.height == 2 - assert encoded.codec_metadata["is_keyframe"] is True + assert packet.codec == "h264" + assert packet.bitstream == "annex_b" + assert packet.width == 3 + assert packet.height == 2 + assert packet.is_keyframe is True def test_h264_lcm_decodes_h264_image_bytes_to_raw_image() -> None: @@ -126,23 +139,15 @@ def test_h264_lcm_decodes_h264_image_bytes_to_raw_image() -> None: Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") ) - image = transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + image = transport.decode(encoded.to_bytes(), StubTopic("/color", Image)) - assert image.encoding == "raw" assert image.frame_id == "cam" assert image.shape == (2, 3, 3) -def test_h264_lcm_decode_false_returns_encoded_image() -> None: - transport = H264LCM(decode_images=False) - encoded = FakeEncoder().encode( - Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") - ) - - image = transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) - - assert image.encoding == H264_IMAGE_ENCODING - assert image.frame_id == "cam" +def test_h264_lcm_decode_false_rejects_encoded_packet_as_image() -> None: + with pytest.raises(ValueError, match="always decodes packets back to Image"): + H264LCM(decode_images=False) def test_h264_lcm_suppresses_decode_gap() -> None: @@ -153,7 +158,7 @@ def test_h264_lcm_suppresses_decode_gap() -> None: ) with pytest.raises(DecodingError, match="waiting for keyframe"): - transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + transport.decode(encoded.to_bytes(), StubTopic("/color", Image)) def test_h264_lcm_suppresses_invalid_h264_image() -> None: @@ -163,8 +168,8 @@ def test_h264_lcm_suppresses_invalid_h264_image() -> None: Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") ) - with pytest.raises(DecodingError, match="Expected H.264 encoded Image"): - transport.decode(encoded.lcm_encode(), StubTopic("/color", Image)) + with pytest.raises(DecodingError, match="Expected H.264 packet"): + transport.decode(encoded.to_bytes(), StubTopic("/color", Image)) def test_h264_lcm_suppresses_non_image_payload() -> None: @@ -174,6 +179,20 @@ def test_h264_lcm_suppresses_non_image_payload() -> None: transport.decode(b"not-an-image", StubTopic("/color", Image)) +def test_h264_lcm_suppresses_malformed_packet_metadata() -> None: + transport = H264LCM() + valid = FakeEncoder().encode( + Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam") + ) + metadata = valid.metadata() + metadata["is_keyframe"] = "false" + bad_header = json.dumps(metadata).encode("utf-8") + payload = b"DIMH2641" + struct.pack(">I", len(bad_header)) + bad_header + valid.data + + with pytest.raises(DecodingError, match="is_keyframe.*boolean"): + transport.decode(payload, StubTopic("/color", Image)) + + def test_h264_lcm_publish_subscribe_delivers_decoded_image() -> None: transport = InMemoryH264PubSub() transport._encoder = FakeEncoder() # type: ignore[assignment] @@ -192,6 +211,47 @@ def test_h264_lcm_publish_subscribe_delivers_decoded_image() -> None: assert received[0].shape == (2, 3, 3) +def test_h264_lcm_subscribers_get_independent_decoders( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(h264_lcm_module, "H264Decoder", FakeStatefulDecoder) + transport = InMemoryH264PubSub() + topic = StubTopic("/color", Image) + received_a: list[int] = [] + received_b: list[int] = [] + + transport.subscribe(topic, lambda image, _topic: received_a.append(int(image.ts))) + transport.subscribe(topic, lambda image, _topic: received_b.append(int(image.ts))) + + keyframe_image = Image( + data=np.zeros((2, 3, 3), dtype=np.uint8), + format=ImageFormat.RGB, + frame_id="cam", + ts=0.0, + ) + delta_image = Image( + data=np.zeros((2, 3, 3), dtype=np.uint8), + format=ImageFormat.RGB, + frame_id="cam", + ts=1.0, + ) + keyframe = _packet( + keyframe_image, + seq=0, + key=True, + ) + delta = _packet( + delta_image, + seq=1, + key=False, + ) + InMemoryPubSubBase.publish(transport, topic, keyframe.to_bytes()) + InMemoryPubSubBase.publish(transport, topic, delta.to_bytes()) + + assert received_a == [0, 1] + assert received_b == [0, 1] + + def test_h264_lcm_late_subscriber_waits_for_keyframe() -> None: transport = InMemoryH264PubSub() topic = StubTopic("/color", Image) @@ -200,20 +260,20 @@ def test_h264_lcm_late_subscriber_waits_for_keyframe() -> None: transport._decoder = decoder # type: ignore[assignment] transport.subscribe(topic, lambda image, _topic: received.append(image)) - delta = _encoded( + delta = _packet( Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam"), seq=1, key=False, ) - InMemoryPubSubBase.publish(transport, topic, delta.lcm_encode()) + InMemoryPubSubBase.publish(transport, topic, delta.to_bytes()) decoder.fail = False - keyframe = _encoded( + keyframe = _packet( Image(data=np.zeros((2, 3, 3), dtype=np.uint8), format=ImageFormat.RGB, frame_id="cam"), seq=2, key=True, ) - InMemoryPubSubBase.publish(transport, topic, keyframe.lcm_encode()) + InMemoryPubSubBase.publish(transport, topic, keyframe.to_bytes()) assert len(received) == 1 assert received[0].frame_id == "cam" diff --git a/dimos/protocol/video/demo_h264_video_e2e.py b/dimos/protocol/video/demo_h264_video_e2e.py index 14f0285024..9cade62a14 100644 --- a/dimos/protocol/video/demo_h264_video_e2e.py +++ b/dimos/protocol/video/demo_h264_video_e2e.py @@ -22,7 +22,6 @@ import tempfile import threading import time -from typing import ClassVar, cast import cv2 import numpy as np @@ -36,11 +35,9 @@ from dimos.hardware.sensors.camera.webcam import Webcam from dimos.memory2.module import OnExisting, Recorder from dimos.memory2.store.sqlite import SqliteStore -from dimos.memory2.stream import Stream from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.impl.h264_lcm import H264LCM -from dimos.protocol.video.h264 import H264Config, H264Decoder, VideoDecodeGapError -from dimos.utils.data import backup_file +from dimos.protocol.video.h264 import H264Config from dimos.utils.logging_config import setup_logger from dimos.visualization.vis_module import vis_module @@ -116,63 +113,15 @@ def _make_frame(self, seq: int) -> Image: ) -class _H264RecorderMixin: - """Mixin that stores selected Image inputs with the H.264 codec.""" - - h264_streams: ClassVar[frozenset[str]] = frozenset() - - @rpc - def start(self) -> None: - recorder = cast("Recorder", self) - Module.start(recorder) - - if recorder.config.g.replay: - logger.info( - "Replay mode active — Recorder disabled, leaving %s untouched", - recorder.config.db_path, - ) - return - - db_path = Path(recorder.config.db_path) - if db_path.exists(): - if recorder.config.on_existing is OnExisting.OVERWRITE: - db_path.unlink() - logger.info("Deleted existing recording %s", db_path) - elif recorder.config.on_existing is OnExisting.BACKUP: - backup = backup_file(db_path, keep_last=recorder.config.backup_keep_last) - if backup is None: - logger.info("Removed existing recording %s (backup_keep_last=0)", db_path) - else: - logger.info("Backed up existing recording %s -> %s", db_path, backup) - else: - raise FileExistsError(f"Recording already exists: {db_path}") - - if not recorder.inputs: - logger.warning("Recorder has no In ports — nothing to record, subclass the Recorder") - return - - for name, port in recorder.inputs.items(): - stream: Stream[Image] - h264_streams: frozenset[str] = getattr(self, "h264_streams", frozenset()) - if name in h264_streams: - stream = recorder.store.stream(name, port.type, codec="h264") - else: - stream = recorder.store.stream(name, port.type) - recorder._port_to_stream(name, port, stream) - logger.info("Recording %s (%s)", name, port.type.__name__) - - -class H264E2ERecorder(_H264RecorderMixin, Recorder): +class H264E2ERecorder(Recorder): """Recorder with a typed image input for the synthetic H.264 demo.""" - h264_streams: ClassVar[frozenset[str]] = frozenset({"color_image"}) color_image: In[Image] -class H264WebcamRecorder(_H264RecorderMixin, Recorder): +class H264WebcamRecorder(Recorder): """Recorder with a typed image input for webcam H.264 QA.""" - h264_streams: ClassVar[frozenset[str]] = frozenset({"color_image"}) color_image: In[Image] @@ -182,10 +131,9 @@ class JpegBenchmarkRecorder(Recorder): jpeg_image: In[Image] -class H264BenchmarkRecorder(_H264RecorderMixin, Recorder): +class H264BenchmarkRecorder(Recorder): """Recorder for the H.264 side of the storage-size benchmark.""" - h264_streams: ClassVar[frozenset[str]] = frozenset({"h264_image"}) h264_image: In[Image] @@ -468,15 +416,9 @@ def start(self) -> None: duration=self.config.duration, loop=self.config.loop, ) - decoder = H264Decoder(_webcam_h264_config) def publish_decoded(image: Image) -> None: - try: - self.color_image.publish(decoder.decode(image)) - except VideoDecodeGapError: - # V1 best effort: seek/replay can begin mid-GOP. Suppress deltas - # until the next keyframe restores decoder state. - return + self.color_image.publish(image) def on_error(error: Exception) -> None: logger.error("H.264 replay pipeline error: %s", error, exc_info=True) @@ -530,9 +472,8 @@ def _on_image(self, image: Image) -> None: if self._received % 10 == 0: logger.info( - "H.264 video probe received %s %s frames", + "H.264 video probe received %s decoded frames", self._received, - image.encoding, ) @rpc @@ -562,6 +503,7 @@ def _webcam() -> Webcam: H264E2ERecorder.blueprint( db_path="h264_video_e2e.db", on_existing=OnExisting.OVERWRITE, + stream_codecs={"color_image": "h264"}, ), H264VideoProbe.blueprint(), ).transports( @@ -570,7 +512,6 @@ def _webcam() -> Webcam: "/demo_h264_video_e2e/color_image", Image, config=_h264_config, - decode_images=False, ) } ) @@ -585,6 +526,7 @@ def _webcam() -> Webcam: H264BenchmarkRecorder.blueprint( db_path="benchmark_h264.db", on_existing=OnExisting.OVERWRITE, + stream_codecs={"h264_image": "h264"}, ), H264StorageBenchmarkReporter.blueprint( jpeg_db_path="benchmark_jpeg.db", @@ -596,7 +538,6 @@ def _webcam() -> Webcam: "/demo_h264_storage_benchmark/h264_image", Image, config=_benchmark_h264_config, - decode_images=False, ) } ) @@ -607,6 +548,7 @@ def _webcam() -> Webcam: H264WebcamRecorder.blueprint( db_path="webcam_h264.db", on_existing=OnExisting.OVERWRITE, + stream_codecs={"color_image": "h264"}, ), ).transports( { @@ -614,7 +556,6 @@ def _webcam() -> Webcam: "/demo_h264_webcam_record/color_image", Image, config=_webcam_h264_config, - decode_images=False, ) } ) diff --git a/dimos/protocol/video/h264.py b/dimos/protocol/video/h264.py index 69548f8c57..313cfdcb85 100644 --- a/dimos/protocol/video/h264.py +++ b/dimos/protocol/video/h264.py @@ -14,14 +14,16 @@ from __future__ import annotations -from collections.abc import Callable, Iterator, Sequence +from collections.abc import Callable, Iterator, Mapping, Sequence from dataclasses import dataclass, field from fractions import Fraction +import json +import struct from typing import TYPE_CHECKING, Any, Protocol, cast import numpy as np -from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat if TYPE_CHECKING: import av @@ -29,6 +31,7 @@ H264_CODEC = "h264" H264_BITSTREAM = "annex_b" +_H264_PACKET_MAGIC = b"DIMH2641" class MissingVideoDependencyError(ImportError): @@ -75,7 +78,157 @@ class H264CodecAdapter(Protocol): def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, int]: ... - def decode_image(self, image: Image) -> Image: ... + def decode_packet(self, packet: H264Packet) -> Image: ... + + +@dataclass(frozen=True) +class H264Packet: + """Internal encoded H.264 access unit for one source ``Image``. + + This is deliberately not a public module-facing message type. It is used by + H.264 transports/storage backends as a physical representation while public + stream APIs continue to expose decoded :class:`Image` values. + """ + + data: bytes + format: ImageFormat + frame_id: str + ts: float + seq: int + pts: int + is_keyframe: bool + keyframe_seq: int + width: int + height: int + channels: int + dtype: str + codec: str = H264_CODEC + bitstream: str = H264_BITSTREAM + + def metadata(self) -> dict[str, Any]: + return { + "codec": self.codec, + "bitstream": self.bitstream, + "format": self.format.value, + "frame_id": self.frame_id, + "ts": self.ts, + "seq": self.seq, + "pts": self.pts, + "is_keyframe": self.is_keyframe, + "keyframe_seq": self.keyframe_seq, + "width": self.width, + "height": self.height, + "channels": self.channels, + "dtype": self.dtype, + } + + def to_bytes(self) -> bytes: + header = json.dumps(self.metadata(), sort_keys=True, separators=(",", ":")).encode("utf-8") + return _H264_PACKET_MAGIC + struct.pack(">I", len(header)) + header + self.data + + @classmethod + def from_bytes(cls, payload: bytes) -> H264Packet: + if not payload.startswith(_H264_PACKET_MAGIC): + raise ValueError("H.264 packet is missing DimOS packet envelope") + offset = len(_H264_PACKET_MAGIC) + if len(payload) < offset + 4: + raise ValueError("H.264 packet envelope is truncated") + header_len = struct.unpack(">I", payload[offset : offset + 4])[0] + header_start = offset + 4 + header_end = header_start + header_len + if header_end > len(payload): + raise ValueError("H.264 packet metadata header is truncated") + try: + metadata = json.loads(payload[header_start:header_end].decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise ValueError("H.264 packet metadata header is invalid") from exc + if not isinstance(metadata, dict): + raise ValueError("H.264 packet metadata header must be a JSON object") + data = payload[header_end:] + return cls.from_parts(data=data, metadata=metadata) + + @classmethod + def from_parts(cls, *, data: bytes, metadata: Mapping[str, Any]) -> H264Packet: + if not isinstance(data, bytes): + raise ValueError("H.264 packet payload must be bytes") + codec = _metadata_str(metadata, "codec", H264_CODEC) + bitstream = _metadata_str(metadata, "bitstream", H264_BITSTREAM) + if codec != H264_CODEC: + raise ValueError(f"Expected codec={H264_CODEC!r}, got {metadata.get('codec')!r}") + if bitstream != H264_BITSTREAM: + raise ValueError( + f"Expected bitstream={H264_BITSTREAM!r}, got {metadata.get('bitstream')!r}" + ) + for key in ("seq", "is_keyframe", "keyframe_seq", "pts", "width", "height"): + if key not in metadata: + raise ValueError(f"H.264 packet missing metadata field {key!r}") + if not data: + raise ValueError("H.264 packet payload cannot be empty") + seq = _metadata_int(metadata, "seq", minimum=0) + pts = _metadata_int(metadata, "pts", minimum=0) + keyframe_seq = _metadata_int(metadata, "keyframe_seq", minimum=0) + width = _metadata_int(metadata, "width", minimum=1) + height = _metadata_int(metadata, "height", minimum=1) + channels = _metadata_int(metadata, "channels", default=3, minimum=1) + is_keyframe = _metadata_bool(metadata, "is_keyframe") + try: + image_format = ImageFormat(_metadata_str(metadata, "format", ImageFormat.RGB.value)) + except ValueError as exc: + raise ValueError( + f"Invalid H.264 packet image format: {metadata.get('format')!r}" + ) from exc + return cls( + data=data, + format=image_format, + frame_id=_metadata_str(metadata, "frame_id", ""), + ts=_metadata_float(metadata, "ts", default=0.0), + seq=seq, + pts=pts, + is_keyframe=is_keyframe, + keyframe_seq=keyframe_seq, + width=width, + height=height, + channels=channels, + dtype=_metadata_str(metadata, "dtype", "uint8"), + codec=codec, + bitstream=bitstream, + ) + + +def _metadata_bool(metadata: Mapping[str, Any], key: str) -> bool: + value = metadata[key] + if not isinstance(value, bool): + raise ValueError(f"H.264 packet metadata field {key!r} must be a boolean") + return value + + +def _metadata_int( + metadata: Mapping[str, Any], + key: str, + *, + default: int | None = None, + minimum: int | None = None, +) -> int: + value = metadata.get(key, default) + if not isinstance(value, int) or isinstance(value, bool): + raise ValueError(f"H.264 packet metadata field {key!r} must be an integer") + if minimum is not None and value < minimum: + raise ValueError(f"H.264 packet metadata field {key!r} must be >= {minimum}") + return value + + +def _metadata_float(metadata: Mapping[str, Any], key: str, *, default: float) -> float: + value = metadata.get(key, default) + if not isinstance(value, (int, float)) or isinstance(value, bool): + raise ValueError(f"H.264 packet metadata field {key!r} must be numeric") + return float(value) + + +def _metadata_str(metadata: Mapping[str, Any], key: str, default: str) -> str: + value = metadata.get(key, default) + if not isinstance(value, str): + raise ValueError(f"H.264 packet metadata field {key!r} must be a string") + return value @dataclass(frozen=True) @@ -103,10 +256,6 @@ def from_rtp_payloads( def ensure_supported_image(image: Image, config: H264Config) -> None: """Validate the first-version H.264 image input contract.""" - if image.encoding != "raw": - raise UnsupportedVideoImageError( - f"H.264 encoding expects raw Image data; got encoding={image.encoding!r}" - ) if image.format not in config.supported_formats: supported = ", ".join(fmt.value for fmt in config.supported_formats) raise UnsupportedVideoImageError( @@ -122,23 +271,11 @@ def ensure_supported_image(image: Image, config: H264Config) -> None: ) -def h264_metadata(image: Image) -> dict[str, Any]: - """Return validated H.264 metadata from an encoded Image.""" +def h264_metadata(packet: H264Packet) -> dict[str, Any]: + """Return validated metadata from an internal H.264 packet.""" - if image.encoding != H264_IMAGE_ENCODING: - raise ValueError(f"Expected H.264 encoded Image, got encoding={image.encoding!r}") - metadata = image.codec_metadata - if metadata.get("codec", H264_CODEC) != H264_CODEC: - raise ValueError(f"Expected codec={H264_CODEC!r}, got {metadata.get('codec')!r}") - if metadata.get("bitstream", H264_BITSTREAM) != H264_BITSTREAM: - raise ValueError( - f"Expected bitstream={H264_BITSTREAM!r}, got {metadata.get('bitstream')!r}" - ) - for key in ("seq", "is_keyframe", "keyframe_seq", "pts", "width", "height"): - if key not in metadata: - raise ValueError(f"H.264 encoded Image missing metadata field {key!r}") - if not isinstance(image.data, bytes): - raise ValueError("H.264 encoded Image payload must be bytes") + metadata = packet.metadata() + H264Packet.from_parts(data=packet.data, metadata=metadata) return metadata @@ -223,34 +360,31 @@ def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, in access_unit = H264AccessUnit.from_rtp_payloads(payloads, self._depayload) return access_unit.data, int(pts) - def decode_image(self, image: Image) -> Image: - metadata = h264_metadata(image) - assert isinstance(image.data, bytes) - frame = self._jitter_frame_type(data=image.data, timestamp=int(metadata["pts"])) + def decode_packet(self, packet: H264Packet) -> Image: + metadata = h264_metadata(packet) + frame = self._jitter_frame_type(data=packet.data, timestamp=int(metadata["pts"])) decoded_frames = self._decoder.decode(frame) if not decoded_frames: raise VideoDecodeGapError("H.264 decoder produced no frame") - return self._from_video_frame(cast("av.VideoFrame", decoded_frames[0]), image) + return self._from_video_frame(cast("av.VideoFrame", decoded_frames[0]), packet) def _to_video_frame(self, image: Image) -> av.VideoFrame: fmt = _av_input_format(image.format) - frame = self._av.VideoFrame.from_ndarray( - np.ascontiguousarray(image.require_raw("h264 encode")), format=fmt - ) + frame = self._av.VideoFrame.from_ndarray(np.ascontiguousarray(image.data), format=fmt) frame.pts = self._frame_index frame.time_base = self._time_base self._frame_index += 1 return frame @staticmethod - def _from_video_frame(frame: av.VideoFrame, image: Image) -> Image: - image_format = image.format + def _from_video_frame(frame: av.VideoFrame, packet: H264Packet) -> Image: + image_format = packet.format arr = frame.to_ndarray(format=_av_input_format(image_format)) - return Image(data=arr, format=image_format, frame_id=image.frame_id, ts=image.ts) + return Image(data=arr, format=image_format, frame_id=packet.frame_id, ts=packet.ts) class H264Encoder: - """Encode a normal DimOS Image stream into per-frame H.264 Images.""" + """Encode a normal DimOS Image stream into internal H.264 packets.""" def __init__( self, @@ -262,57 +396,67 @@ def __init__( self._codec = codec or AiortcH264Codec(self.config) self._seq = 0 self._keyframe_seq = -1 + self._last_source_signature: tuple[ImageFormat, int, int, int, str] | None = None - def encode(self, image: Image, *, force_keyframe: bool = False) -> Image: + def encode(self, image: Image, *, force_keyframe: bool = False) -> H264Packet: ensure_supported_image(image, self.config) - is_keyframe = self._should_force_keyframe(force_keyframe) + source_signature = self._source_signature(image) + is_keyframe = self._should_force_keyframe(source_signature, force_keyframe) access_unit, pts = self._codec.encode_image(image, force_keyframe=is_keyframe) if is_keyframe: self._keyframe_seq = self._seq - metadata: dict[str, Any] = { - "seq": self._seq, - "codec": H264_CODEC, - "bitstream": H264_BITSTREAM, - "is_keyframe": is_keyframe, - "keyframe_seq": self._keyframe_seq, - "pts": pts, - "width": image.width, - "height": image.height, - "channels": image.channels, - "dtype": str(image.dtype), - } - self._seq += 1 - return Image.encoded( + packet = H264Packet( data=access_unit, - encoding=H264_IMAGE_ENCODING, format=image.format, frame_id=image.frame_id, ts=image.ts, - codec_metadata=metadata, + seq=self._seq, + pts=pts, + is_keyframe=is_keyframe, + keyframe_seq=self._keyframe_seq, + width=image.width, + height=image.height, + channels=image.channels, + dtype=str(image.dtype), ) + self._last_source_signature = source_signature + self._seq += 1 + return packet - def _should_force_keyframe(self, requested: bool) -> bool: + @staticmethod + def _source_signature(image: Image) -> tuple[ImageFormat, int, int, int, str]: + return (image.format, image.width, image.height, image.channels, str(image.dtype)) + + def _should_force_keyframe( + self, + source_signature: tuple[ImageFormat, int, int, int, str], + requested: bool, + ) -> bool: if requested or self._seq == 0 or self._keyframe_seq < 0: return True + if ( + self._last_source_signature is not None + and source_signature != self._last_source_signature + ): + return True since_keyframe = self._seq - self._keyframe_seq return since_keyframe >= min(self.config.keyframe_interval, self.config.max_gop_frames) class GopBuffer: - """Track H.264 GOP validity across an encoded Image stream.""" + """Track H.264 GOP validity across an encoded packet stream.""" def __init__(self) -> None: self.expected_seq: int | None = None self.keyframe_seq: int | None = None self.valid = False - def accept(self, image: Image) -> bool: - """Return True when the encoded Image can be safely decoded.""" + def accept(self, packet: H264Packet) -> bool: + """Return True when the encoded packet can be safely decoded.""" - metadata = h264_metadata(image) - seq = int(metadata["seq"]) - keyframe_seq = int(metadata["keyframe_seq"]) - is_keyframe = bool(metadata["is_keyframe"]) + seq = packet.seq + keyframe_seq = packet.keyframe_seq + is_keyframe = packet.is_keyframe if self.expected_seq is not None and seq != self.expected_seq: self.valid = False @@ -332,7 +476,7 @@ def accept(self, image: Image) -> bool: class H264Decoder: - """Decode H.264 encoded Images into normal raw DimOS Images.""" + """Decode internal H.264 packets into normal raw DimOS Images.""" def __init__( self, @@ -345,13 +489,12 @@ def __init__( self._codec = codec or AiortcH264Codec(self.config) self._gop_buffer = gop_buffer or GopBuffer() - def decode(self, image: Image) -> Image: - metadata = h264_metadata(image) - if not self._gop_buffer.accept(image): + def decode(self, packet: H264Packet) -> Image: + if not self._gop_buffer.accept(packet): raise VideoDecodeGapError( - f"Cannot decode H.264 image seq={metadata['seq']}; waiting for next keyframe" + f"Cannot decode H.264 packet seq={packet.seq}; waiting for next keyframe" ) - return self._codec.decode_image(image) + return self._codec.decode_packet(packet) def _av_input_format(format: ImageFormat) -> str: @@ -388,6 +531,7 @@ def _av_h264_profile(profile: str) -> str: "H264Config", "H264Decoder", "H264Encoder", + "H264Packet", "MissingVideoDependencyError", "UnsupportedVideoImageError", "VideoDecodeGapError", diff --git a/dimos/protocol/video/test_h264.py b/dimos/protocol/video/test_h264.py index 71aba194bf..f1d892e982 100644 --- a/dimos/protocol/video/test_h264.py +++ b/dimos/protocol/video/test_h264.py @@ -16,11 +16,14 @@ import builtins from dataclasses import dataclass +import json +import struct import numpy as np import pytest -from dimos.msgs.sensor_msgs.Image import H264_IMAGE_ENCODING, Image, ImageFormat +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.protocol.video import h264 as h264_module from dimos.protocol.video.h264 import ( AiortcH264Codec, GopBuffer, @@ -28,6 +31,7 @@ H264Config, H264Decoder, H264Encoder, + H264Packet, MissingVideoDependencyError, UnsupportedVideoImageError, VideoDecodeGapError, @@ -46,14 +50,14 @@ def encode_image(self, image: Image, *, force_keyframe: bool) -> tuple[bytes, in return b"\x00\x00\x00\x01\x67sps\x00\x00\x00\x01\x68pps\x00\x00\x00\x01\x65idr", 90 return b"\x00\x00\x00\x01\x41delta", 180 - def decode_image(self, image: Image) -> Image: - metadata = h264_metadata(image) + def decode_packet(self, packet: H264Packet) -> Image: + metadata = h264_metadata(packet) self.decoded_sequences.append(int(metadata["seq"])) return Image( - data=np.zeros((image.height, image.width, 3), dtype=np.uint8), - format=image.format, - frame_id=image.frame_id, - ts=image.ts, + data=np.zeros((packet.height, packet.width, 3), dtype=np.uint8), + format=packet.format, + frame_id=packet.frame_id, + ts=packet.ts, ) @@ -66,41 +70,75 @@ def _image(format: ImageFormat = ImageFormat.RGB, dtype: np.dtype = np.dtype(np. ) -def _encoded(seq: int, *, key: bool, keyframe_seq: int | None = None) -> Image: - return Image.encoded( +def _packet(seq: int, *, key: bool, keyframe_seq: int | None = None) -> H264Packet: + return H264Packet( data=b"\x00\x00\x00\x01\x65" if key else b"\x00\x00\x00\x01\x41", - encoding=H264_IMAGE_ENCODING, format=ImageFormat.RGB, frame_id="cam", ts=123.0 + seq, - codec_metadata={ - "seq": seq, - "codec": "h264", - "bitstream": "annex_b", - "is_keyframe": key, - "keyframe_seq": seq if key else (0 if keyframe_seq is None else keyframe_seq), - "pts": seq * 90, - "width": 6, - "height": 4, - "channels": 3, - "dtype": "uint8", - }, + seq=seq, + is_keyframe=key, + keyframe_seq=seq if key else (0 if keyframe_seq is None else keyframe_seq), + pts=seq * 90, + width=6, + height=4, + channels=3, + dtype="uint8", ) -def test_encoded_h264_image_lcm_roundtrips_metadata_and_access_unit() -> None: - image = _encoded(0, key=True) +def test_h264_packet_roundtrips_metadata_and_access_unit() -> None: + packet = _packet(0, key=True) - decoded = Image.lcm_decode(image.lcm_encode()) + decoded = H264Packet.from_bytes(packet.to_bytes()) - assert decoded == image - assert decoded.encoding == H264_IMAGE_ENCODING - assert decoded.codec_metadata["codec"] == "h264" - assert decoded.codec_metadata["bitstream"] == "annex_b" + assert decoded == packet + assert decoded.codec == "h264" + assert decoded.bitstream == "annex_b" assert isinstance(decoded.data, bytes) assert decoded.data.startswith(b"\x00\x00\x00\x01") +def test_h264_packet_rejects_non_object_metadata() -> None: + header = json.dumps(["not", "an", "object"]).encode("utf-8") + payload = h264_module._H264_PACKET_MAGIC + struct.pack(">I", len(header)) + header + b"data" + + with pytest.raises(ValueError, match="must be a JSON object"): + H264Packet.from_bytes(payload) + + +def test_h264_packet_rejects_non_boolean_keyframe_metadata() -> None: + metadata = _packet(0, key=True).metadata() + metadata["is_keyframe"] = "false" + + with pytest.raises(ValueError, match="is_keyframe.*boolean"): + H264Packet.from_parts(data=b"\x00\x00\x00\x01\x65", metadata=metadata) + + +def test_h264_packet_rejects_invalid_dimensions() -> None: + metadata = _packet(0, key=True).metadata() + metadata["width"] = 0 + + with pytest.raises(ValueError, match="width.*>= 1"): + H264Packet.from_parts(data=b"\x00\x00\x00\x01\x65", metadata=metadata) + + +def test_h264_packet_rejects_non_bytes_payload() -> None: + metadata = _packet(0, key=True).metadata() + + with pytest.raises(ValueError, match="payload must be bytes"): + H264Packet.from_parts(data="not-bytes", metadata=metadata) # type: ignore[arg-type] + + +def test_image_remains_raw_only_public_type() -> None: + image = _image() + + assert isinstance(image.data, np.ndarray) + assert not hasattr(image, "encoding") + assert not hasattr(image, "codec_metadata") + assert not hasattr(Image, "encoded") + + def test_access_unit_assembles_depayloaded_annex_b_fragments() -> None: unit = H264AccessUnit.from_rtp_payloads( [b"payload-a", b"payload-b"], @@ -110,7 +148,7 @@ def test_access_unit_assembles_depayloaded_annex_b_fragments() -> None: assert unit.data == b"\x00\x00\x00\x01payload-a\x00\x00\x00\x01payload-b" -def test_encoder_emits_encoded_image_metadata_and_periodic_keyframes() -> None: +def test_encoder_emits_packet_metadata_and_periodic_keyframes() -> None: codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) encoder = H264Encoder(H264Config(keyframe_interval=2, max_gop_frames=2), codec=codec) @@ -118,39 +156,67 @@ def test_encoder_emits_encoded_image_metadata_and_periodic_keyframes() -> None: p1 = encoder.encode(_image()) p2 = encoder.encode(_image()) - assert [p0.codec_metadata["seq"], p1.codec_metadata["seq"], p2.codec_metadata["seq"]] == [ + assert [p0.seq, p1.seq, p2.seq] == [ 0, 1, 2, ] - assert [ - p0.codec_metadata["is_keyframe"], - p1.codec_metadata["is_keyframe"], - p2.codec_metadata["is_keyframe"], - ] == [True, False, True] - assert [ - p0.codec_metadata["keyframe_seq"], - p1.codec_metadata["keyframe_seq"], - p2.codec_metadata["keyframe_seq"], - ] == [0, 0, 2] + assert [p0.is_keyframe, p1.is_keyframe, p2.is_keyframe] == [True, False, True] + assert [p0.keyframe_seq, p1.keyframe_seq, p2.keyframe_seq] == [0, 0, 2] assert codec.encoded_force_keyframes == [True, False, True] assert isinstance(p0.data, bytes) assert b"\x67" in p0.data and b"\x68" in p0.data +def test_encoder_forces_keyframe_when_source_shape_changes() -> None: + codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) + encoder = H264Encoder(H264Config(keyframe_interval=30, max_gop_frames=30), codec=codec) + changed_shape = Image( + data=np.zeros((8, 6, 3), dtype=np.uint8), + format=ImageFormat.RGB, + frame_id="cam", + ts=124.0, + ) + + p0 = encoder.encode(_image()) + p1 = encoder.encode(_image()) + p2 = encoder.encode(changed_shape) + + assert [p0.is_keyframe, p1.is_keyframe, p2.is_keyframe] == [True, False, True] + assert codec.encoded_force_keyframes == [True, False, True] + + +def test_encoder_forces_keyframe_when_source_format_changes() -> None: + codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) + encoder = H264Encoder(H264Config(keyframe_interval=30, max_gop_frames=30), codec=codec) + changed_format = Image( + data=np.zeros((4, 6, 3), dtype=np.uint8), + format=ImageFormat.BGR, + frame_id="cam", + ts=124.0, + ) + + p0 = encoder.encode(_image()) + p1 = encoder.encode(_image()) + p2 = encoder.encode(changed_format) + + assert [p0.is_keyframe, p1.is_keyframe, p2.is_keyframe] == [True, False, True] + assert codec.encoded_force_keyframes == [True, False, True] + + def test_gop_buffer_suppresses_delta_after_sequence_gap_until_keyframe() -> None: codec = FakeCodec(encoded_force_keyframes=[], decoded_sequences=[]) decoder = H264Decoder(codec=codec, gop_buffer=GopBuffer()) - assert decoder.decode(_encoded(0, key=True)).frame_id == "cam" - assert decoder.decode(_encoded(1, key=False, keyframe_seq=0)).frame_id == "cam" + assert decoder.decode(_packet(0, key=True)).frame_id == "cam" + assert decoder.decode(_packet(1, key=False, keyframe_seq=0)).frame_id == "cam" with pytest.raises(VideoDecodeGapError): - decoder.decode(_encoded(3, key=False, keyframe_seq=0)) + decoder.decode(_packet(3, key=False, keyframe_seq=0)) with pytest.raises(VideoDecodeGapError): - decoder.decode(_encoded(4, key=False, keyframe_seq=0)) + decoder.decode(_packet(4, key=False, keyframe_seq=0)) - assert decoder.decode(_encoded(5, key=True)).frame_id == "cam" + assert decoder.decode(_packet(5, key=True)).frame_id == "cam" assert codec.decoded_sequences == [0, 1, 5] diff --git a/dimos/robot/drone/camera_module.py b/dimos/robot/drone/camera_module.py index 72c77fe2ee..b77c597980 100644 --- a/dimos/robot/drone/camera_module.py +++ b/dimos/robot/drone/camera_module.py @@ -138,7 +138,7 @@ def _processing_loop(self) -> None: self._latest_frame = None # Get numpy array from Image - img_array = frame.require_raw("DroneCameraModule._process_frames") + img_array = frame.data # Create header header = Header(self.camera_frame_id) diff --git a/dimos/robot/drone/drone_tracking_module.py b/dimos/robot/drone/drone_tracking_module.py index 846faf26fe..16efd6df82 100644 --- a/dimos/robot/drone/drone_tracking_module.py +++ b/dimos/robot/drone/drone_tracking_module.py @@ -115,7 +115,7 @@ def _get_latest_frame(self) -> np.ndarray[Any, np.dtype[Any]] | None: if self._latest_frame is None: return None # Convert Image to numpy array - data = self._latest_frame.require_raw("DroneTrackingModule._get_latest_frame") + data = self._latest_frame.data return data @rpc diff --git a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py index d304d3d7cc..06ee559147 100644 --- a/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py +++ b/dimos/robot/unitree/go2/blueprints/smart/unitree_go2_h264_video.py @@ -30,7 +30,7 @@ from dimos.navigation.patrolling.module import PatrollingModule from dimos.navigation.replanning_a_star.module import ReplanningAStarPlanner from dimos.protocol.video.demo_h264_video_e2e import H264VideoProbe -from dimos.protocol.video.h264 import H264Config, H264Decoder, VideoDecodeGapError +from dimos.protocol.video.h264 import H264Config from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import rerun_config from dimos.robot.unitree.go2.connection import GO2Connection from dimos.visualization.vis_module import vis_module @@ -40,22 +40,10 @@ target_fps=15, keyframe_interval=30, ) -_go2_rerun_decoder: H264Decoder | None = None def _convert_h264_color_image(image: Image) -> Any: - """Decode H.264 color frames before logging them in Rerun.""" - global _go2_rerun_decoder - - if image.encoding == "h264": - if _go2_rerun_decoder is None: - _go2_rerun_decoder = H264Decoder(_go2_h264_config) - try: - image = _go2_rerun_decoder.decode(image) - except (VideoDecodeGapError, ValueError): - # Replay/subscription can start mid-GOP. Suppress deltas until the - # next keyframe restores decoder state. - return None + """Convert decoded color frames before logging them in Rerun.""" return image.to_rerun() diff --git a/dimos/teleop/quest_hosted/video_track.py b/dimos/teleop/quest_hosted/video_track.py index 07ad3a2846..2a17c3c39a 100644 --- a/dimos/teleop/quest_hosted/video_track.py +++ b/dimos/teleop/quest_hosted/video_track.py @@ -104,9 +104,7 @@ async def recv(self) -> av.VideoFrame: self._first_mono = now pts = int((now - self._first_mono) * VIDEO_CLOCK_RATE) - frame = av.VideoFrame.from_ndarray( - img.require_raw("CameraVideoTrack.recv"), format=_AV_FORMAT_MAP.get(img.format, "bgr24") - ) + frame = av.VideoFrame.from_ndarray(img.data, format=_AV_FORMAT_MAP.get(img.format, "bgr24")) frame.pts = pts frame.time_base = VIDEO_TIME_BASE return frame diff --git a/docs/capabilities/memory/h264_storage_benchmark_report.md b/docs/capabilities/memory/h264_storage_benchmark_report.md index 39e9c888ca..751de60442 100644 --- a/docs/capabilities/memory/h264_storage_benchmark_report.md +++ b/docs/capabilities/memory/h264_storage_benchmark_report.md @@ -9,7 +9,7 @@ Blueprint: `demo-h264-storage-benchmark` The benchmark source publishes identical `Image` frames to two recorder streams: - `jpeg_image` uses the default memory2 `Image` codec (`JpegCodec`). -- `h264_image` uses `codec="h264"` and receives encoded H.264 images through `H264LcmTransport(decode_images=False)`. +- `h264_image` uses `codec="h264"`; the logical stream is still `Image`, while memory2 writes internal H.264 packet blobs. The reporter measures compact SQLite snapshot sizes with SQLite backup, so active WAL/SHM sidecars do not skew the comparison. @@ -83,7 +83,7 @@ ffmpeg -y -v error \ In this run, memory2 H.264 storage was smaller than the direct ffmpeg elementary stream. That means this benchmark does not show a storage-efficiency penalty from the per-frame Annex B access-unit layout. It mostly shows that the current aiortc/libx264 path and the direct ffmpeg command did not produce identical rate-control output, even with similar nominal settings. -The storage overhead within memory2 was measurable: the H.264 DB was 118,045 bytes larger than its stored blob payloads, or 11.7% over the blob bytes. That overhead includes observation metadata, SQLite page overhead, and one encoded-image envelope per frame. +The storage overhead within memory2 was measurable: the H.264 DB was 118,045 bytes larger than its stored blob payloads, or 11.7% over the blob bytes. That overhead includes observation metadata, SQLite page overhead, and one internal H.264 packet envelope per frame. ## Notes diff --git a/docs/capabilities/memory/index.md b/docs/capabilities/memory/index.md index 3641b45c33..a3b870e7e5 100644 --- a/docs/capabilities/memory/index.md +++ b/docs/capabilities/memory/index.md @@ -225,33 +225,33 @@ color = store.stream( ) ``` -Recorder modules that need H.264 storage should create their target stream with -the same codec override: +Recorder modules that need H.264 storage can opt in per input stream with +`RecorderConfig.stream_codecs`: ```python skip from dimos.core.stream import In -from dimos.memory2.module import Recorder +from dimos.memory2.module import Recorder, RecorderConfig from dimos.msgs.sensor_msgs.Image import Image class H264Recorder(Recorder): color_image: In[Image] - def start(self) -> None: - stream = self.store.stream("color_image", Image, codec="h264") - self._port_to_stream("color_image", self.color_image, stream) +recorder = H264Recorder.blueprint( + config=RecorderConfig(stream_codecs={"color_image": "h264"}) +) ``` H.264 storage keeps the normal memory2 shape: one observation row per source -frame. The blob for that observation stores one encoded `Image` whose data is a -complete H.264 Annex B access unit, not individual RTP fragments. H.264 frame -metadata lives in `Image.codec_metadata`. +frame. The logical stream type is still `Image`, while the physical blob stores +one internal H.264 packet envelope containing a complete Annex B access unit, not +individual RTP fragments. H.264 frame metadata lives in observation tags. -Metadata queries do not decode pixels. You can inspect timestamps, poses, tags, -frame ids, `Image.encoding`, and H.264 codec metadata without paying decode -cost. Accessing `obs.data` returns an encoded `Image` for H.264 streams. Use an -explicit H.264 decode session to convert replayed encoded images to raw pixel -images; that decoder suppresses deltas until the first keyframe at or after the -replay start point. +Metadata queries do not decode pixels. You can inspect timestamps, poses, and +tags without paying decode cost. Accessing `obs.data` returns a decoded raw +`Image`; random lazy reads seek to the previous stored keyframe by durable +observation id and decode forward to the requested observation. Whole-stream +id-ordered iteration uses a narrow sequential decode path; replay remains correct +and yields decoded images while scheduling by timestamp. H.264 storage currently supports uint8 RGB, BGR, and grayscale images. It raises an explicit error for depth images, 16-bit images, alpha formats, and other @@ -275,6 +275,6 @@ codec or storage changes to inspect: - logs from the source, recorder, and probe; - memory2 metadata queries that do not touch `obs.data`; -- lazy `obs.data` decode after a valid keyframe, with best-effort suppression of undecodable deltas; +- lazy `obs.data` decode by seeking from the previous durable keyframe; - replay of the recorded stream; and - sequence-gap behavior, if you inject packet loss in the transport tests. diff --git a/docs/coding-agents/style.md b/docs/coding-agents/style.md index 657399b40c..0551ee45a2 100644 --- a/docs/coding-agents/style.md +++ b/docs/coding-agents/style.md @@ -50,15 +50,15 @@ from dimos.memory2.store.base import Store from dimos.memory2.stream import Stream ``` -## H.264 encoded Image shape +## H.264 Image transport and storage shape When editing H.264 image transport or memory2 storage, keep the public module contract as `Out[Image]` and `In[Image]`. Do not expose RTP fragments to module authors or memory2 observations. -For LCM, DDS, and memory2 storage, each encoded `Image` must contain all H.264 -NAL units for exactly one source frame as one Annex B access unit, with H.264 -frame state in `Image.codec_metadata`. Store one memory2 observation per source -frame. P-frames still depend on earlier GOP state, so decode from a valid -keyframe and suppress output after sequence gaps, late join, or replay seek until -the next keyframe. +`Image` is always decoded, pixel-addressable raster data; `Image.data` must stay +a NumPy array. For LCM transport and memory2 storage, H.264 bytes are an internal +physical representation. Each internal H.264 packet contains all NAL units for +exactly one source frame as one Annex B access unit. Store one memory2 +observation per source frame, attach codec/keyframe metadata as internal tags, +and decode from a valid keyframe after sequence gaps, late join, or random seek. diff --git a/docs/development/testing.md b/docs/development/testing.md index 306a6403b7..0055b83e68 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -67,7 +67,7 @@ pytest -m self_hosted dimos/path/to/test_something.py The H.264 unit tests use fake codec adapters where possible, so they run in the default suite without requiring FFmpeg/libx264. Run the focused tests after -changing encoded `Image` shape, eager/raw `Image` compatibility, H.264 transport, +changing raw `Image` compatibility, H.264 transport, memory2 storage, or the demo blueprint: ```bash diff --git a/docs/usage/transports/index.md b/docs/usage/transports/index.md index 062096716e..928e899239 100644 --- a/docs/usage/transports/index.md +++ b/docs/usage/transports/index.md @@ -120,8 +120,7 @@ Use `H264LcmTransport` when a high-rate `Image` stream needs video compression over LCM. The module API stays the same: publishers still call `Out[Image].publish(image)`, and subscribers still receive `Image` values. The transport encodes each source frame as one H.264 Annex B access unit on the wire -and decodes it at the subscriber by default. Set `decode_images=False` when a -subscriber, such as a recorder, should receive encoded `Image` values instead. +and always decodes it back to a raw `Image` before delivering it to subscribers. ```python skip from dimos.core.transport import H264LcmTransport @@ -143,10 +142,11 @@ blueprint = blueprint.transports( ) ``` -Encoded delivery uses the same public `Image` type but sets -`image.encoding == "h264"`, stores the Annex B payload in `image.data`, and stores -sequence/keyframe metadata in `image.codec_metadata`. Raw-pixel methods raise for -encoded images; decode them through a H.264 decode session first. +Encoded H.264 packets are an internal transport representation, not a public +`Image` variant. `Image.data` remains pixel-addressable NumPy data at module +boundaries. If you need opt-in H.264 persistence, configure memory2 storage for +that logical `Image` stream instead of asking the transport to expose encoded +packets. H.264 transport is opt-in. The default image paths remain unchanged: normal LCM uses the `Image` LCM encoding, and memory2 still stores images with the default From c332455e6d74f92fe84f5f6ba8c909d63dd96029 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 16 Jun 2026 04:08:05 +0000 Subject: [PATCH 2/3] [autofix.ci] apply automated fixes --- dimos/models/vl/florence.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dimos/models/vl/florence.py b/dimos/models/vl/florence.py index 8adda7a7af..8e964bb85f 100644 --- a/dimos/models/vl/florence.py +++ b/dimos/models/vl/florence.py @@ -137,10 +137,7 @@ def caption_batch(self, *images: Image) -> list[str]: task_prompt = self._task_prompt # Convert all to PIL - pil_images = [ - PILImage.fromarray(img.to_rgb().data) - for img in images - ] + pil_images = [PILImage.fromarray(img.to_rgb().data) for img in images] # Process batch inputs = self._processor( From a27d1c98e72e78fbed78a2957dccd4475972af69 Mon Sep 17 00:00:00 2001 From: cc Date: Mon, 15 Jun 2026 21:54:09 -0700 Subject: [PATCH 3/3] chore: rewire demog --- dimos/protocol/video/demo_h264_video_e2e.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dimos/protocol/video/demo_h264_video_e2e.py b/dimos/protocol/video/demo_h264_video_e2e.py index 9cade62a14..6f98352ec9 100644 --- a/dimos/protocol/video/demo_h264_video_e2e.py +++ b/dimos/protocol/video/demo_h264_video_e2e.py @@ -26,11 +26,12 @@ import cv2 import numpy as np +from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.coordination.blueprints import autoconnect from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import In, Out -from dimos.core.transport import H264LcmTransport +from dimos.core.transport import H264LcmTransport, pSHMTransport from dimos.hardware.sensors.camera.module import CameraModule from dimos.hardware.sensors.camera.webcam import Webcam from dimos.memory2.module import OnExisting, Recorder @@ -552,10 +553,9 @@ def _webcam() -> Webcam: ), ).transports( { - ("color_image", Image): H264LcmTransport( + ("color_image", Image): pSHMTransport( "/demo_h264_webcam_record/color_image", - Image, - config=_webcam_h264_config, + default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE, ) } )