diff --git a/default.env b/default.env index a43ee53305..0f56044b7e 100644 --- a/default.env +++ b/default.env @@ -7,10 +7,13 @@ HUGGINGFACE_PRV_ENDPOINT= ROBOT_IP= CONN_TYPE=webrtc -# Hosted teleop (teleop-hosted-* blueprints) -TELEOP_API_KEY= -TELEOP_ROBOT_ID= -TELEOP_ROBOT_NAME= +# Hosted teleop — transport blueprints (teleop-hosted-go2-transport / -livekit / -multicam). +# Names map to the transports.broker.* config (-o transports.broker.api_key=...). +# (The deprecated HostedTeleopModule blueprints take -o hosted-teleop.broker_api_key=...) +TRANSPORTS__BROKER__BROKER_URL=https://teleop.dimensionalos.com +TRANSPORTS__BROKER__API_KEY= +TRANSPORTS__BROKER__ROBOT_ID= +TRANSPORTS__BROKER__ROBOT_NAME= WEBRTC_SERVER_HOST=0.0.0.0 WEBRTC_SERVER_PORT=9991 DISPLAY=:0 diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 5db5dba2ab..183c5a878c 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -36,6 +36,7 @@ from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory from dimos.protocol.pubsub.impl.webrtc.providers.broker import BrokerConfig +from dimos.protocol.pubsub.impl.webrtc.providers.livekit_broker import LiveKitBrokerConfig from dimos.protocol.pubsub.impl.webrtc.providers.spec import ProviderConfig from dimos.protocol.pubsub.impl.webrtc.webrtcpubsub import WebRTCPubSub from dimos.utils.logging_config import setup_logger @@ -456,6 +457,22 @@ class CloudflareTransport(WebRTCTransport[M]): _config_cls = BrokerConfig +class LiveKitTransport(WebRTCTransport[M]): + """WebRTC DataChannels via the hosted teleop broker + LiveKit SFU. + + Drop-in alternative to :class:`CloudflareTransport`; config kwargs flow into + :class:`LiveKitBrokerConfig`, populated from ``transports.broker.*`` + (``TRANSPORTS__BROKER__*`` env / ``-o`` overrides). + + unitree_go2_livekit = unitree_go2_basic.transports({ + ("cmd_vel", Twist): LiveKitTransport("cmd_unreliable", TwistStamped), + ("color_image", Image): LiveKitVideoTransport(), + }) + """ + + _config_cls = LiveKitBrokerConfig + + class WebRTCVideoTransport(Transport[Any]): """Robot camera → remote viewer as a WebRTC video track (provider-agnostic). @@ -515,4 +532,10 @@ class CloudflareVideoTransport(WebRTCVideoTransport): _config_cls = BrokerConfig +class LiveKitVideoTransport(WebRTCVideoTransport): + """Camera → teleop web client via the hosted broker + LiveKit (see WebRTCVideoTransport).""" + + _config_cls = LiveKitBrokerConfig + + class ZenohTransport(PubSubTransport[T]): ... diff --git a/dimos/hardware/sensors/camera/realsense/camera.py b/dimos/hardware/sensors/camera/realsense/camera.py index 6257720608..70d7b6ea96 100644 --- a/dimos/hardware/sensors/camera/realsense/camera.py +++ b/dimos/hardware/sensors/camera/realsense/camera.py @@ -358,17 +358,29 @@ def _publish_tf(self, ts: float) -> None: ) transforms.append(depth_to_depth_optical) - color_tf = self._extrinsics_to_transform( - self._color_to_depth_extrinsics, - self._camera_link, - self._color_frame, - ts, - ) - # Invert the transform since extrinsics are color->depth - color_tf = color_tf.inverse() - color_tf.frame_id = self._camera_link - color_tf.child_frame_id = self._color_frame - color_tf.ts = ts + # camera_link -> camera_color_frame. With depth disabled there are no + # color->depth extrinsics, so fall back to identity (color at the + # camera_link origin) instead of dereferencing None. + if self._color_to_depth_extrinsics is not None: + color_tf = self._extrinsics_to_transform( + self._color_to_depth_extrinsics, + self._camera_link, + self._color_frame, + ts, + ) + # Invert the transform since extrinsics are color->depth + color_tf = color_tf.inverse() + color_tf.frame_id = self._camera_link + color_tf.child_frame_id = self._color_frame + color_tf.ts = ts + else: + color_tf = Transform( + translation=Vector3(0.0, 0.0, 0.0), + rotation=Quaternion(0.0, 0.0, 0.0, 1.0), + frame_id=self._camera_link, + child_frame_id=self._color_frame, + ts=ts, + ) transforms.append(color_tf) # camera_color_frame -> camera_color_optical_frame diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py index dd540a24b9..fb9ca91a02 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py @@ -43,6 +43,8 @@ from collections import defaultdict from collections.abc import Callable import contextlib +import json +import time from typing import TYPE_CHECKING, Any from dimos.protocol.pubsub.impl.webrtc.providers.sdp import propagate_bundle_candidates @@ -312,6 +314,8 @@ def _open_channel(self, name: str, sctp_id: int) -> None: def _on_msg(payload: Any) -> None: if isinstance(payload, str): payload = payload.encode() + if name == "state_reliable": + self._maybe_answer_ping(payload) with self._lock: callbacks = list(self._callbacks.get(name, ())) for cb in callbacks: @@ -330,6 +334,33 @@ def _close_channel(self, name: str) -> None: with contextlib.suppress(Exception): ch.close() + def _maybe_answer_ping(self, payload: bytes) -> None: + """Answer the web client's clock-sync ping inline on the loop thread. + + The operator measures RTT/offset from ping→pong timing, so the reply + must not ride a module hop (stream dispatch latency would inflate + every sample, and keep-latest mailboxes could drop pings outright). + The ping still fans out to subscribers afterwards — the provider stays + a transparent relay with this one reflex attached. + """ + if not payload.startswith(b"{"): + return # LCM binary or other non-JSON — not ours + try: + msg = json.loads(payload) + except ValueError: + return + if msg.get("type") != "ping" or msg.get("client_ts") is None: + return + pong = json.dumps({"type": "pong", "client_ts": msg["client_ts"], "robot_ts": time.time()}) + with self._lock: + ch = self._dcs.get("state_reliable_back") + # Pong MUST go on state_reliable_back — CF bridges one direction only; + # a robot send on state_reliable would be silently dropped. + if ch is not None and ch.readyState == "open": + ch.send(pong) + else: + logger.warning("ping received but state_reliable_back not open — pong dropped") + # ─── Public API (Provider) ─────────────────────────────────────── def publish(self, topic: str, data: bytes) -> None: diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py new file mode 100644 index 0000000000..4aefd66bc2 --- /dev/null +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -0,0 +1,347 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Broker-mediated LiveKit provider (hosted teleop). + +The robot asks the ``dimensional-teleop`` broker for a LiveKit room + JWT +(``POST /api/v1/sessions {transport:"livekit"}`` → ``{url, token, room}``), +then connects straight to the LiveKit SFU. Unlike the Cloudflare ``broker.py`` +path there is no SDP relay, no SCTP-id juggling, and no heartbeat-driven +channel lifecycle: LiveKit data is bidirectional and topic-addressed, so a +single room carries every channel. + +Topics (kept identical to the Cloudflare path so the typed-fingerprint demux at +the transport layer is unchanged): + cmd_unreliable operator → robot commands (lossy) + state_reliable operator → robot control plane (reliable) + state_reliable_back robot → operator telemetry (reliable) + +Video: ``set_video_frame()`` pushes camera frames into a sendonly LiveKit track +(published lazily on the first frame) — typically via ``LiveKitVideoTransport`` +bound to a blueprint's Image stream. + +Config comes from the blueprint's ``transports.broker.*`` flow (env form +``TRANSPORTS__BROKER__``, or ``-o transports.broker.=...``), the +same scheme as the Cloudflare ``BrokerConfig``: + TRANSPORTS__BROKER__BROKER_URL — default https://teleop.dimensionalos.com + TRANSPORTS__BROKER__API_KEY — robot API key (dtk_live_*); derives identity + TRANSPORTS__BROKER__ROBOT_ID — optional robot identifier override + TRANSPORTS__BROKER__ROBOT_NAME — human-readable robot name +""" + +from __future__ import annotations + +import asyncio +from collections import defaultdict +from collections.abc import Callable +import contextlib +import importlib.util +from typing import TYPE_CHECKING, Any + +from dimos.protocol.pubsub.impl.webrtc.providers.spec import ( + AsyncProviderBase, + ProviderConfig, +) +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +# find_spec instead of importing: the livekit rtc SDK pulls native libs and +# core.transport imports this module everywhere. Imported lazily on start(). +LIVEKIT_AVAILABLE = ( + importlib.util.find_spec("livekit") is not None + and importlib.util.find_spec("httpx") is not None +) + +if TYPE_CHECKING: + import httpx + from livekit import rtc + + from dimos.msgs.sensor_msgs.Image import Image + + +class LiveKitBrokerConfig(ProviderConfig): + """Hosted teleop over LiveKit. Config from transports.broker.* (TRANSPORTS__BROKER__*).""" + + broker_url: str | None = None + api_key: str | None = None + robot_id: str | None = None + robot_name: str | None = None + heartbeat_hz: float = 1.0 + + def _create(self) -> LiveKitBrokerProvider: + return LiveKitBrokerProvider(self) + + +def _image_to_rgba(img: Image) -> tuple[int, int, bytes]: + """Pack a dimos Image into (width, height, RGBA bytes) for a LiveKit frame.""" + import numpy as np + + from dimos.msgs.sensor_msgs.Image import ImageFormat + + arr = img.data + if arr.dtype == np.uint16: + arr = (arr >> 8).astype(np.uint8) # scale 16-bit (e.g. GRAY16) to 8-bit, not truncate + elif arr.dtype != np.uint8: + arr = arr.astype(np.uint8) + h, w = arr.shape[:2] + fmt = img.format + if fmt == ImageFormat.RGBA: + rgba = arr + elif fmt == ImageFormat.BGRA: + rgba = arr[..., [2, 1, 0, 3]] + elif fmt == ImageFormat.RGB: + rgba = np.dstack([arr, np.full((h, w), 255, np.uint8)]) + elif fmt in (ImageFormat.GRAY, ImageFormat.GRAY16): + g = arr if arr.ndim == 2 else arr[..., 0] + rgba = np.dstack([g, g, g, np.full((h, w), 255, np.uint8)]) + else: # BGR and anything else: treat as BGR + rgba = np.dstack([arr[..., 2], arr[..., 1], arr[..., 0], np.full((h, w), 255, np.uint8)]) + return w, h, np.ascontiguousarray(rgba).tobytes() + + +class _VideoPublisher: + """Lazily-published sendonly LiveKit video track fed from an Image stream. + + Frames arrive from the producer thread; the source/track are created and the + track published on the first frame (so dimensions come from real data), all + marshalled onto the provider's loop thread where the room lives. + """ + + def __init__(self) -> None: + self._room: rtc.Room | None = None + self._loop: asyncio.AbstractEventLoop | None = None + self._source: rtc.VideoSource | None = None + self._publish_task: asyncio.Task[None] | None = None + + def bind(self, room: rtc.Room, loop: asyncio.AbstractEventLoop) -> None: + self._room = room + self._loop = loop + + def reset(self) -> None: + """Drop per-session state so a later bind() (reconnect) re-publishes the + track on the new room. Called from the provider's _disconnect().""" + self._room = None + self._loop = None + self._source = None + self._publish_task = None + + def set_latest(self, img: Image) -> None: + loop = self._loop + if loop is None or not loop.is_running(): + return # not connected yet; pre-connect frames are dropped + try: + w, h, buf = _image_to_rgba(img) + except Exception: + logger.debug("video: frame conversion failed", exc_info=True) + return + loop.call_soon_threadsafe(self._capture, w, h, buf) + + def _capture(self, w: int, h: int, buf: bytes) -> None: + from livekit import rtc + + if self._source is None: + self._source = rtc.VideoSource(w, h) + self._publish_task = asyncio.ensure_future(self._publish()) + frame = rtc.VideoFrame(w, h, rtc.VideoBufferType.RGBA, buf) + self._source.capture_frame(frame) + + async def _publish(self) -> None: + from livekit import rtc + + assert self._room is not None and self._source is not None + try: + track = rtc.LocalVideoTrack.create_video_track("camera", self._source) + opts = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + await self._room.local_participant.publish_track(track, opts) + except Exception: + # Clear _source so the next captured frame retries publish, instead + # of feeding frames forever into a never-published source. + logger.warning("LiveKit video track publish failed; will retry", exc_info=True) + self._source = None + self._publish_task = None + return + logger.info("LiveKit video track published") + + +class LiveKitBrokerProvider(AsyncProviderBase): + """Bidirectional broker-mediated LiveKit provider. + + Inbound (operator → robot): ``cmd_unreliable`` + ``state_reliable``, + delivered to subscribers by topic. Outbound (robot → operator): + ``publish()`` on any topic (LiveKit is bidirectional); ``cmd_unreliable`` + rides the lossy channel, everything else reliable. Together with + ``LiveKitTransport`` / ``LiveKitVideoTransport`` this is the LiveKit analog + of ``BrokerProvider``. + """ + + LOSSY_TOPICS = ("cmd_unreliable",) + + def __init__(self, config: LiveKitBrokerConfig | None = None) -> None: + if not LIVEKIT_AVAILABLE: + raise RuntimeError("livekit and httpx required: pip install dimos[livekit]") + super().__init__() + config = config or LiveKitBrokerConfig() + # Config is populated from transports.broker.* (= TRANSPORTS__BROKER__* + # env / -o overrides), same scheme as the Cloudflare BrokerConfig. + self._broker_url = (config.broker_url or "https://teleop.dimensionalos.com").rstrip("/") + self._api_key = config.api_key or "" + self._robot_id = config.robot_id or "" + self._robot_name = config.robot_name or "robot" + if not self._api_key: + raise RuntimeError( + "transports.broker.api_key required " + "(TRANSPORTS__BROKER__API_KEY=dtk_live_...; create one in the " + "teleop dashboard: New Key)" + ) + self._config = config + + self._http: httpx.AsyncClient | None = None + self._room: rtc.Room | None = None + self.session_id: str | None = None + self.room: str | None = None + self._hb_task: asyncio.Task[None] | None = None + self._video = _VideoPublisher() + # topic → subscriber callbacks. Guarded by self._lock (from the base). + self._callbacks: dict[str, list[Callable[[bytes, str], None]]] = defaultdict(list) + + @property + def _headers(self) -> dict[str, str]: + return {"X-Robot-API-Key": self._api_key, "Content-Type": "application/json"} + + # ─── Connect / Disconnect (loop thread) ────────────────────────── + + async def _connect(self) -> None: + import httpx + from livekit import rtc + + self._http = httpx.AsyncClient(timeout=30.0) + r = await self._http.post( + f"{self._broker_url}/api/v1/sessions", + headers=self._headers, + json={ + "transport": "livekit", + "robot_name": self._robot_name, + **({"robot_id": self._robot_id} if self._robot_id else {}), + }, + ) + if r.status_code not in (200, 201): + raise RuntimeError(f"Broker session create failed: {r.status_code} {r.text[:500]}") + data = r.json() + self.session_id = data["session_id"] + self.room = data.get("room") + url, token = data["url"], data["token"] + + self._room = rtc.Room() + + @self._room.on("data_received") # type: ignore[untyped-decorator] + def _on_data(packet: Any) -> None: + self._dispatch(packet) + + await self._room.connect(url, token) + self._video.bind(self._room, asyncio.get_running_loop()) + logger.info( + "LiveKit broker provider connected: session=%s room=%s robot=%s", + self.session_id, + data.get("room"), + self._robot_id or "(derived from API key)", + ) + self._hb_task = asyncio.get_running_loop().create_task(self._heartbeat_loop()) + + async def _disconnect(self) -> None: + if self._hb_task is not None: + self._hb_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._hb_task + self._hb_task = None + if self._http and self.session_id: + with contextlib.suppress(Exception): # best-effort deregistration + await self._http.delete( + f"{self._broker_url}/api/v1/sessions/{self.session_id}", + headers=self._headers, + ) + if self._room is not None: + with contextlib.suppress(Exception): + await self._room.disconnect() + self._room = None + self._video.reset() # clear per-session video state so a restart re-publishes + if self._http: + await self._http.aclose() + self._http = None + self.session_id = None + + # ─── Heartbeat (loop thread; metrics/liveness only) ────────────── + + async def _heartbeat_loop(self) -> None: + interval = 1.0 / max(self._config.heartbeat_hz, 0.1) + while True: + try: + if self._http is not None and self.session_id is not None: + await self._http.post( + f"{self._broker_url}/api/v1/sessions/{self.session_id}/heartbeat", + headers=self._headers, + json={}, + ) + except Exception: + logger.warning("LiveKit heartbeat failed", exc_info=True) + await asyncio.sleep(interval) + + # ─── Dispatch (loop thread) ────────────────────────────────────── + + def _dispatch(self, packet: Any) -> None: + topic = getattr(packet, "topic", "") or "" + payload = getattr(packet, "data", b"") + if isinstance(payload, (bytearray, memoryview)): + payload = bytes(payload) + with self._lock: + callbacks = list(self._callbacks.get(topic, ())) + for cb in callbacks: + try: + cb(payload, topic) + except Exception: + logger.exception("LiveKit subscriber callback error") + + # ─── Public API (Provider) ─────────────────────────────────────── + + def publish(self, topic: str, data: bytes) -> None: + """Robot → operator on any topic (LiveKit is bidirectional). Messages + drop while no room/operator is connected — normal pubsub behaviour.""" + if isinstance(data, (bytearray, memoryview)): + data = bytes(data) + reliable = topic not in self.LOSSY_TOPICS + with self._lock: + if not self._started or self._loop is None or self._room is None: + return + coro = self._room.local_participant.publish_data(data, reliable=reliable, topic=topic) + asyncio.run_coroutine_threadsafe(coro, self._loop) + + def set_video_frame(self, img: Image) -> None: + """Robot → operator video: publish the latest camera frame (thread-safe).""" + self._video.set_latest(img) + + def subscribe(self, topic: str, callback: Callable[[bytes, str], None]) -> Callable[[], None]: + if not self.is_connected: + self.start() + with self._lock: + self._callbacks[topic].append(callback) + + def _unsub() -> None: + with self._lock: + with contextlib.suppress(ValueError): + self._callbacks[topic].remove(callback) + + return _unsub + + +__all__ = ["LiveKitBrokerConfig", "LiveKitBrokerProvider"] diff --git a/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py b/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py new file mode 100644 index 0000000000..3c1894f2b1 --- /dev/null +++ b/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py @@ -0,0 +1,97 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for BrokerProvider's inline clock-sync ping responder. + +No network — a mocked ``state_reliable_back`` channel is injected and we +assert on what ``_maybe_answer_ping`` sends. The ping protocol itself +(ping → pong with echoed client_ts + fresh robot_ts) is what the web +client's RTT/offset estimator depends on. +""" + +from __future__ import annotations + +import json +import time +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from dimos.protocol.pubsub.impl.webrtc.providers.broker import BrokerConfig, BrokerProvider +from dimos.protocol.pubsub.impl.webrtc.providers.spec import WEBRTC_AVAILABLE + +# broker.py imports aiortc lazily, but BrokerProvider.__init__ pulls in the +# video track (aiortc/av) — so constructing one needs the extras installed. +pytestmark = pytest.mark.skipif(not WEBRTC_AVAILABLE, reason="aiortc not installed") + + +@pytest.fixture +def provider() -> BrokerProvider: + """BrokerProvider with a mocked open state_reliable_back channel.""" + p = BrokerProvider(BrokerConfig(api_key="dtk_test_key")) + back = MagicMock() + back.readyState = "open" + p._dcs["state_reliable_back"] = back + return p + + +def _sent_payload(provider: BrokerProvider) -> dict[str, Any]: + ch: Any = provider._dcs["state_reliable_back"] + ch.send.assert_called_once() + return json.loads(ch.send.call_args[0][0]) # type: ignore[no-any-return] + + +def test_ping_echoes_pong_with_robot_ts(provider: BrokerProvider) -> None: + before = time.time() + provider._maybe_answer_ping(json.dumps({"type": "ping", "client_ts": 100.5}).encode()) + after = time.time() + + sent = _sent_payload(provider) + assert sent["type"] == "pong" + assert sent["client_ts"] == 100.5 + assert before <= sent["robot_ts"] <= after + + +def test_ping_missing_client_ts_dropped(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b'{"type":"ping"}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_non_json_binary_ignored(provider: BrokerProvider) -> None: + """LCM binary on the channel (future telemetry) must not be parsed.""" + provider._maybe_answer_ping(b"\x00\x01\x02\x03lcm-ish") + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_malformed_json_dropped(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b"{not json") + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_other_json_types_ignored(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b'{"type":"video_stats","fps":30}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_pong_dropped_when_back_channel_closed(provider: BrokerProvider) -> None: + provider._dcs["state_reliable_back"].readyState = "closed" # type: ignore[misc] + provider._maybe_answer_ping(b'{"type":"ping","client_ts":1.0}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_pong_dropped_when_back_channel_absent(provider: BrokerProvider) -> None: + del provider._dcs["state_reliable_back"] + # No channel at all — must not raise. + provider._maybe_answer_ping(b'{"type":"ping","client_ts":1.0}') diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 5dcb481f60..c1a6e12206 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -73,7 +73,9 @@ "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", + "teleop-hosted-go2-livekit": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_livekit", "teleop-hosted-go2-transport": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_transport", + "teleop-hosted-go2-multicam": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_multicam", "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", @@ -171,7 +173,6 @@ "gstreamer-camera-module": "dimos.hardware.sensors.camera.gstreamer.gstreamer_camera.GstreamerCameraModule", "hosted-arm-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedArmTeleopModule", "hosted-teleop-module": "dimos.teleop.quest_hosted.hosted_teleop_module.HostedTeleopModule", - "hosted-teleop-recorder": "dimos.teleop.quest_hosted.blueprints.HostedTeleopRecorder", "hosted-twist-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedTwistTeleopModule", "joint-trajectory-controller": "dimos.manipulation.control.trajectory_controller.joint_trajectory_controller.JointTrajectoryController", "joystick-module": "dimos.robot.unitree.b1.joystick_module.JoystickModule", diff --git a/dimos/robot/test_all_blueprints.py b/dimos/robot/test_all_blueprints.py index a2c44a31bb..760be3e335 100644 --- a/dimos/robot/test_all_blueprints.py +++ b/dimos/robot/test_all_blueprints.py @@ -51,6 +51,7 @@ "coordinator-xarm7", "dual-xarm6-planner", "teleop-hosted-go2", + "teleop-hosted-go2-livekit", "teleop-hosted-go2-transport", "teleop-hosted-xarm7", "teleop-quest-dual", diff --git a/dimos/robot/unitree/connection.py b/dimos/robot/unitree/connection.py index 44101cc19d..4975c6d60c 100644 --- a/dimos/robot/unitree/connection.py +++ b/dimos/robot/unitree/connection.py @@ -49,8 +49,11 @@ from dimos.robot.unitree.type.odometry import Odometry from dimos.types.timestamped import Timestamped from dimos.utils.decorators.decorators import simple_mcache +from dimos.utils.logging_config import setup_logger from dimos.utils.reactive import backpressure, callback_to_observable +logger = setup_logger() + VideoMessage: TypeAlias = NDArray[np.uint8] # Shape: (height, width, 3) @@ -314,6 +317,10 @@ def balance_stand(self) -> bool: self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": SPORT_CMD["BalanceStand"]}) ) + def sport_command(self, api_id: int) -> bool: + """Send a parameterless SPORT_MOD command by api_id (Hello, Stretch, ...).""" + return bool(self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": api_id})) + def set_obstacle_avoidance(self, enabled: bool = True) -> None: self.publish_request( RTC_TOPIC["OBSTACLES_AVOID"], @@ -324,28 +331,35 @@ def free_walk(self) -> bool: """Activate FreeWalk locomotion mode — enables walking and velocity commands.""" return bool(self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": SPORT_CMD["FreeWalk"]})) - def enable_rage_mode(self) -> bool: - """Enable Rage Mode on the Go2 via WebRTC. - Assumes the robot is already in BalanceStand. + def set_rage_mode(self, enable: bool) -> bool: + """Toggle Rage Mode (api 2059) over WebRTC, both directions. + + BalanceStand → 2059 {data:enable} → SwitchJoystick(enable). When on, + normal move() twists drive at the ~2.5 m/s rage envelope. """ + # Re-establish BalanceStand before toggling (notes: always BalanceStand + # before flipping Rage). + if not self.balance_stand(): + logger.warning("balance_stand() failed before rage toggle — proceeding") + time.sleep(0.3) + rage_ok = bool( self.publish_request( RTC_TOPIC["SPORT_MOD"], - {"api_id": self._SPORT_API_ID_RAGEMODE, "parameter": {"data": True}}, + {"api_id": self._SPORT_API_ID_RAGEMODE, "parameter": {"data": enable}}, ) ) - time.sleep(2.0) + if not rage_ok: + return False - joystick_ok = bool( + if enable: + time.sleep(2.0) # let FsmRageMode transition settle + return bool( self.publish_request( RTC_TOPIC["SPORT_MOD"], - { - "api_id": SPORT_CMD["SwitchJoystick"], - "parameter": {"data": True}, - }, + {"api_id": SPORT_CMD["SwitchJoystick"], "parameter": {"data": enable}}, ) ) - return rage_ok and joystick_ok def liedown(self) -> bool: return bool( diff --git a/dimos/robot/unitree/dimsim_connection.py b/dimos/robot/unitree/dimsim_connection.py index 5afcd1fda7..1dd2ffd764 100644 --- a/dimos/robot/unitree/dimsim_connection.py +++ b/dimos/robot/unitree/dimsim_connection.py @@ -79,6 +79,10 @@ def odom_stream(self) -> Observable[PoseStamped]: def video_stream(self) -> Observable[Image]: return Subject() + @functools.cache + def lowstate_stream(self) -> Observable[Any]: + return Subject() + def move(self, twist: Twist, duration: float = 0.0) -> bool: return True @@ -91,10 +95,13 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 5568a473ef..59dc2af177 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Protocol from pydantic import Field +from reactivex import empty from reactivex.disposable import Disposable from reactivex.observable import Observable import rerun.blueprint as rrb @@ -49,6 +50,7 @@ from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.robot.unitree.connection import UnitreeWebRTCConnection +from dimos.robot.unitree.type.lowstate import LowStateMsg from dimos.utils.decorators.decorators import cached_property, simple_mcache if sys.version_info < (3, 13): @@ -77,12 +79,14 @@ def stop(self) -> None: ... def lidar_stream(self) -> Observable: ... # type: ignore[type-arg] def odom_stream(self) -> Observable: ... # type: ignore[type-arg] def video_stream(self) -> Observable: ... # type: ignore[type-arg] + def lowstate_stream(self) -> Observable: ... # type: ignore[type-arg] def move(self, twist: Twist, duration: float = 0.0) -> bool: ... def standup(self) -> bool: ... def liedown(self) -> bool: ... def balance_stand(self) -> bool: ... + def sport_command(self, api_id: int) -> bool: ... def set_obstacle_avoidance(self, enabled: bool = True) -> None: ... - def enable_rage_mode(self) -> bool: ... + def set_rage_mode(self, enable: bool) -> bool: ... def publish_request(self, topic: str, data: dict) -> dict: ... # type: ignore[type-arg] @@ -168,10 +172,13 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True @simple_mcache @@ -186,6 +193,11 @@ def odom_stream(self) -> Observable[PoseStamped]: def video_stream(self) -> Observable[Image]: return self.replay.streams.color_image.observable() + @simple_mcache + def lowstate_stream(self) -> Observable: # type: ignore[type-arg] + # Replay datasets carry no low-level state (battery/IMU) — emit nothing. + return empty() + def move(self, twist: Twist, duration: float = 0.0) -> bool: return True @@ -212,6 +224,7 @@ class GO2Connection(Module, Camera, Pointcloud): camera_info_static: CameraInfo = _camera_info_static() _camera_info_thread: Thread | None = None _latest_video_frame: Image | None = None + _latest_lowstate: LowStateMsg | None = None @classmethod def rerun_views(cls): # type: ignore[no-untyped-def] @@ -245,6 +258,7 @@ def onimage(image: Image) -> None: self.register_disposable(self.connection.odom_stream().subscribe(self._publish_tf)) self.register_disposable(self.connection.video_stream().subscribe(onimage)) self.register_disposable(Disposable(self.cmd_vel.subscribe(self.move))) + self.register_disposable(self.connection.lowstate_stream().subscribe(self._on_lowstate)) self._camera_info_thread = Thread( target=self.publish_camera_info, @@ -257,7 +271,7 @@ def onimage(image: Image) -> None: self.connection.balance_stand() if self.config.mode == Go2Mode.RAGE: - self.connection.enable_rage_mode() + self.connection.set_rage_mode(True) self.connection.set_obstacle_avoidance(self.config.g.obstacle_avoidance) @@ -331,16 +345,31 @@ def balance_stand(self) -> bool: return self.connection.balance_stand() @rpc - def enable_rage_mode(self) -> bool: - """Enable Rage Mode (~2.5 m/s forward velocity envelope). - Ensures BalanceStand precondition regardless of current FSM state. + def set_rage_mode(self, enable: bool) -> bool: + """Toggle Rage Mode on/off (~2.5 m/s envelope when on). + On the WebRTC backend this re-establishes the BalanceStand + precondition before toggling; sim backends are no-ops. """ - self.connection.balance_stand() - time.sleep(0.3) - result = self.connection.enable_rage_mode() - logger.info("Rage Mode enabled") + result = self.connection.set_rage_mode(enable) + logger.info("Rage Mode %s", "enabled" if enable else "disabled") return result + def _on_lowstate(self, msg: LowStateMsg) -> None: + """Cache the latest low-level state push (battery, IMU, motors, etc.).""" + self._latest_lowstate = msg + + @skill + def get_battery_soc(self) -> int | None: + """Returns the robot's battery state-of-charge as a percentage (0-100). + + Use this skill to answer battery / power / charge questions. Returns + None if no low-level state has been received yet. + """ + try: + return int(self._latest_lowstate["data"]["bms_state"]["soc"]) # type: ignore[index] + except (KeyError, TypeError, ValueError): + return None + @rpc def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: """Publish a request to the WebRTC connection. diff --git a/dimos/robot/unitree/go2/hosted_connection.py b/dimos/robot/unitree/go2/hosted_connection.py new file mode 100644 index 0000000000..0e8080d719 --- /dev/null +++ b/dimos/robot/unitree/go2/hosted_connection.py @@ -0,0 +1,326 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 driver + hosted-teleop control plane in ONE module. + +The broker provider is a per-process singleton, and ``GO2Connection`` is +``dedicated_worker=True`` (its own process), so all hosted broker transports +(cmd, video, state, state_back) must live on this one module to share a single +CF session — a separate bridge module lands in another worker = a 2nd session +the operator can't see. Opt-in subclass; plain ``GO2Connection`` is unchanged. +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Any + +import numpy as np +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.stream import In, Out +from dimos.msgs.sensor_msgs.Image import Image +from dimos.robot.unitree.go2.connection import ConnectionConfig, GO2Connection +from dimos.teleop.utils.stream_stats import LiveStreamStats +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +# Operator-allowed sport commands → SPORT_CMD api_id (robot-side allow-list). +ALLOWED_SPORT_CMDS: dict[str, int] = { + "StandDown": 1005, + "RecoveryStand": 1006, + "Sit": 1009, + "Hello": 1016, + "Stretch": 1017, + "Damp": 1001, + "FrontPounce": 1032, # acrobatic — leaps + "FrontJump": 1031, # acrobatic — leaps +} + + +class Go2HostedConnectionConfig(ConnectionConfig): + telemetry_hz: float = 3.0 # robot → operator HUD telemetry push rate + + +class Go2HostedConnection(GO2Connection): + """GO2Connection + the hosted-teleop state plane, colocated (one session).""" + + config: Go2HostedConnectionConfig + + state_json: In[bytes] # operator → robot control JSON (state_reliable) + cmd_raw: In[bytes] # operator → robot command bytes (stats tap) + video_stats: Out[VideoStats] # operator video health, for recorders + telemetry_out: Out[bytes] # robot → operator telemetry + acks (state_reliable_back) + cam2_in: In[Image] # extra camera (RealSense) for the mux + mux_image: Out[Image] # composited cam1(Go2)+cam2 → video transport + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._cmd_stats = LiveStreamStats() + self._telemetry_thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._rage_active = False # tracks firmware Rage Mode (speed bar) + self._cam_lock = threading.Lock() + self._cam_frames: dict[str, Image] = {} # "cam1"/"cam2" → latest frame + self._cam_selected = ["cam1"] # operator tab selection + + @rpc + def start(self) -> None: + super().start() + self._stop_event.clear() + # Sync subscribes (not async handle_*): keep-latest would drop bursts. + for stream, cb in ( + (self.state_json, self._on_state_json), + (self.cmd_raw, self._on_cmd_raw), + ): + self.register_disposable(Disposable(stream.subscribe(cb))) + # Mux: tap the base's color_image as cam1, RealSense as cam2 → mux_image. + self.register_disposable( + Disposable(self.color_image.subscribe(lambda i: self._on_cam("cam1", i))) + ) + self.register_disposable( + Disposable(self.cam2_in.subscribe(lambda i: self._on_cam("cam2", i))) + ) + self._start_telemetry() + + # ─── Camera mux ────────────────────────────────────────────────── + def _on_cam(self, cam: str, img: Image) -> None: + with self._cam_lock: + self._cam_frames[cam] = img + shown = cam in self._cam_selected + if shown: + out = self._composite() + if out is not None: + self.mux_image.publish(out) + + def _composite(self) -> Image | None: + with self._cam_lock: + order = [c for c in ("cam1", "cam2") if c in self._cam_selected] + imgs = [self._cam_frames[c] for c in order if c in self._cam_frames] + if not imgs: + return None + if len(imgs) == 1: + return imgs[0] + import cv2 + + target_h = min(im.data.shape[0] for im in imgs) + tiles = [] + for im in imgs: + h, w = im.data.shape[:2] + tiles.append( + cv2.resize(im.data, (int(w * target_h / h), target_h)) if h != target_h else im.data + ) + return Image(data=np.hstack(tiles), format=imgs[0].format, frame_id="camera_mux") + + def _set_cam_selection(self, cams: list[str]) -> None: + sel = [c for c in cams if c in ("cam1", "cam2")] or ["cam1"] + with self._cam_lock: + self._cam_selected = sel + logger.info("camera selection → %s", sel) + out = self._composite() + if out is not None: + self.mux_image.publish(out) + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._telemetry_thread is not None: + self._telemetry_thread.join(timeout=2.0) + self._telemetry_thread = None + super().stop() + + # ─── Inbound state plane (operator → robot) ────────────────────── + + def _on_state_json(self, data: Any) -> None: + if isinstance(data, str): + data = data.encode() + if not data.startswith(b"{"): + return # not JSON + try: + msg = json.loads(data) + except ValueError: + logger.warning("state_reliable: malformed JSON: %r", data[:80]) + return + + kind = msg.get("type") + if kind == "sport_cmd": + self._handle_sport_cmd(msg) + elif kind == "set_mode": + self._handle_set_mode(msg) + elif kind == "camera_select": + self._set_cam_selection(msg.get("cams", [])) + elif kind == "obstacle_avoidance": + self._handle_obstacle_avoidance(msg) + elif kind == "video_stats": + self.video_stats.publish(VideoStats.from_dict(msg)) + elif kind == "clock_report": + logger.info( + "clock-sync: operator rtt=%s offset=%s", + msg.get("rtt_ms"), + msg.get("offset_ms"), + ) + # ping answered by BrokerProvider; unknown types ignored. + + def _handle_sport_cmd(self, msg: dict[str, Any]) -> None: + """Operator button → allow-listed SPORT_MOD request, ack on cmd_ack.""" + name = msg.get("name") + nonce = msg.get("nonce") + + # StandReady is the standup+balance combo, never the two separately. + if name == "StandReady": + self._stand_ready(nonce) + return + + api_id = ALLOWED_SPORT_CMDS.get(name) if isinstance(name, str) else None + if api_id is None: + logger.warning("sport_cmd: disallowed/unknown name %r", name) + self._send_ack(nonce, False) + return + + # sport_command blocks the WebRTC loop (= the video loop) — run off the + # callback so a gesture like Hello doesn't stall video. + def runner() -> None: + ok = False + try: + ok = bool(self.connection.sport_command(api_id)) + except Exception: + logger.exception("sport_cmd %s failed", name) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name=f"Go2SportCmd-{name}").start() + + def _stand_ready(self, nonce: Any) -> None: + """Standup → settle → BalanceStand (drive-ready). Acks when balanced.""" + + def runner() -> None: + ok = False + try: + self.connection.standup() + time.sleep(3.0) # standup must finish before balance_stand + self.connection.balance_stand() + ok = True + except Exception: + logger.exception("StandReady failed") + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2StandReady").start() + + def _handle_set_mode(self, msg: dict[str, Any]) -> None: + """Speed-mode select. normal/high differ only by browser-side scale; + only the rage on/off boundary toggles the firmware (set_rage_mode).""" + mode = msg.get("mode") + nonce = msg.get("nonce") + if mode not in ("normal", "high", "rage"): + logger.warning("set_mode: unknown mode %r", mode) + self._send_ack(nonce, False) + return + want_rage = mode == "rage" + if want_rage == self._rage_active: + self._send_ack(nonce, True) # already in the right FSM + return + + def runner() -> None: + ok = False + try: + ok = bool(self.connection.set_rage_mode(want_rage)) + if ok: + self._rage_active = want_rage + logger.info("set_mode: rage=%s ok=%s", want_rage, ok) + except Exception: + logger.exception("set_mode rage=%s failed", want_rage) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2SetMode").start() + + def _handle_obstacle_avoidance(self, msg: dict[str, Any]) -> None: + """Toggle the Go2's onboard obstacle avoidance on/off.""" + enabled = bool(msg.get("enabled")) + nonce = msg.get("nonce") + + def runner() -> None: + ok = False + try: + self.connection.set_obstacle_avoidance(enabled) + ok = True + logger.info("obstacle_avoidance: enabled=%s", enabled) + except Exception: + logger.exception("obstacle_avoidance enabled=%s failed", enabled) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2ObstacleAvoid").start() + + def _send_ack(self, nonce: Any, ok: bool) -> None: + try: + self.telemetry_out.publish( + json.dumps({"type": "cmd_ack", "nonce": nonce, "ok": ok}).encode() + ) + except Exception: + logger.debug("cmd_ack publish failed", exc_info=True) + + # ─── Command-plane health (robot → operator) ───────────────────── + + def _on_cmd_raw(self, data: Any) -> None: + """Read the send-stamp off the header for one-way latency stats.""" + if isinstance(data, str): + data = data.encode() + try: + from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped + + lcm = LCMTwistStamped.lcm_decode(data) + ts = lcm.header.stamp.sec + lcm.header.stamp.nsec / 1_000_000_000 + except Exception: + return # foreign / undecodable frame — skip + self._cmd_stats.record(ts, nbytes=len(data)) + + def _battery_soc(self) -> int | None: + """Battery SOC from the cached lowstate, without invoking the logged + ``get_battery_soc`` skill (which the 3 Hz telemetry loop would spam).""" + try: + return int(self._latest_lowstate["data"]["bms_state"]["soc"]) # type: ignore[index] + except (KeyError, TypeError, ValueError): + return None + + def _start_telemetry(self) -> None: + def runner() -> None: + interval = 1.0 / max(self.config.telemetry_hz, 0.1) + while not self._stop_event.is_set(): + snap = self._cmd_stats.snapshot() + soc = self._battery_soc() + if snap is not None or soc is not None: + payload = json.dumps( + { + "type": "robot_telemetry", + "cmd": snap, + "soc": soc, + "robot_ts": time.time(), + } + ) + try: + self.telemetry_out.publish(payload.encode()) + except Exception: + logger.debug("telemetry publish failed", exc_info=True) + self._stop_event.wait(interval) + + self._telemetry_thread = threading.Thread( + target=runner, daemon=True, name="Go2HostedTelemetry" + ) + self._telemetry_thread.start() + + +__all__ = ["Go2HostedConnection", "Go2HostedConnectionConfig"] diff --git a/dimos/robot/unitree/mujoco_connection.py b/dimos/robot/unitree/mujoco_connection.py index 4c455899e8..ebb9a282c8 100644 --- a/dimos/robot/unitree/mujoco_connection.py +++ b/dimos/robot/unitree/mujoco_connection.py @@ -33,7 +33,7 @@ import numpy as np from numpy.typing import NDArray -from reactivex import Observable +from reactivex import Observable, empty from reactivex.abc import ObserverBase, SchedulerBase from reactivex.disposable import Disposable @@ -234,10 +234,13 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True def get_video_frame(self) -> NDArray[Any] | None: @@ -337,6 +340,11 @@ def get_video_as_image() -> Image | None: return self._create_stream(get_video_as_image, VIDEO_FPS, "Video") + @functools.cache + def lowstate_stream(self) -> Observable[Any]: + # Sim has no low-level state (battery/IMU) stream — emit nothing. + return empty() + def move(self, twist: Twist, duration: float = 0.0) -> bool: if self._is_cleaned_up or self.shm_data is None: return True diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py index adb4817755..78198a77e5 100644 --- a/dimos/teleop/quest_hosted/blueprints.py +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -14,26 +14,30 @@ """Hosted teleop blueprints (WebRTC transport).""" -from pathlib import Path - -from dimos.constants import STATE_DIR from dimos.control.blueprints.teleop import coordinator_teleop_xarm7 from dimos.core.coordination.blueprints import autoconnect -from dimos.core.transport import CloudflareTransport, CloudflareVideoTransport, LCMTransport +from dimos.core.transport import ( + CloudflareTransport, + CloudflareVideoTransport, + LCMTransport, + LiveKitTransport, + LiveKitVideoTransport, +) +from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped from dimos.msgs.sensor_msgs.Image import Image from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.robot.unitree.go2.connection import GO2Connection +from dimos.robot.unitree.go2.hosted_connection import Go2HostedConnection from dimos.teleop.quest.quest_types import Buttons from dimos.teleop.quest_hosted.hosted_extensions import ( HostedArmTeleopModule, HostedTwistTeleopModule, ) -from dimos.teleop.utils.recorder import TeleopRecorder, TeleopRecorderConfig -# Single XArm7 teleop via the hosted (WebRTC) client. Pass `--simulation` to -# run the coordinator inside MuJoCo, omit it for real hardware. +# Single XArm7 hosted teleop. Pass `--simulation` to run in MuJoCo. teleop_hosted_xarm7 = ( autoconnect( HostedArmTeleopModule.blueprint(task_names={"right": "teleop_xarm"}), @@ -51,59 +55,87 @@ ) -# viewer="none" drops the rerun window (operator gets video over WebRTC, so the -# robot-side rerun view is unwanted here). +# Hosted teleop via the legacy module wrapper (the transport swap is preferred). teleop_hosted_go2 = autoconnect( HostedTwistTeleopModule.blueprint(), unitree_go2_basic, ).global_config(n_workers=8, viewer="none") +# Hosted teleop over CF Realtime. Run with -o transports.broker.api_key=dtk_live_... +teleop_hosted_go2_transport = ( + autoconnect( + unitree_go2_basic.disabled_modules(GO2Connection), + Go2HostedConnection.blueprint(), + ) + .transports( + { + ("cmd_vel", Twist): CloudflareTransport.spec("cmd_unreliable", TwistStamped), + ("color_image", Image): CloudflareVideoTransport.spec(), + ("state_json", bytes): CloudflareTransport.spec("state_reliable"), + ("telemetry_out", bytes): CloudflareTransport.spec("state_reliable_back"), + ("cmd_raw", bytes): CloudflareTransport.spec("cmd_unreliable"), # stats tap + ("cmd_vel_stamped", TwistStamped): CloudflareTransport.spec( + "cmd_unreliable", TwistStamped + ), # recorder tap + } + ) + .global_config(viewer="none") +) -# Hosted teleop as a pure transport swap — no teleop module wrapper. The -# browser's keyboard/VR view sends LCM TwistStamped on cmd_unreliable; the -# transport decodes it straight onto the go2 cmd_vel stream (commands arrive -# as sent: normalized [-1, 1], no speed rescaling). The camera stream feeds -# the session's WebRTC video track via CloudflareVideoTransport (same -# provider/PeerConnection), and robot → operator telemetry can ride -# CloudflareTransport.spec("state_reliable_back", ...) the same way. -# -# Run: dimos run teleop-hosted-go2-transport -o transports.broker.api_key=dtk_live_... -# (or TRANSPORTS__BROKER__API_KEY=dtk_live_... in env; robot identity is -# derived from the key, override with transports.broker.robot_id if needed) -# then connect from https://teleop.dimensionalos.com (keyboard view). -teleop_hosted_go2_transport = unitree_go2_basic.transports( - { - ("cmd_vel", Twist): CloudflareTransport.spec("cmd_unreliable", TwistStamped), - ("color_image", Image): CloudflareVideoTransport.spec(), - } -).global_config(viewer="none") - - -HOSTED_RECORDINGS_DIR = STATE_DIR / "hosted_teleop" / "recordings" - - -class HostedTeleopRecorderConfig(TeleopRecorderConfig): - # Same generic recorder, just defaulting recordings into the hosted dir. - db_path: str | Path = HOSTED_RECORDINGS_DIR / "recording_hosted.db" - - -class HostedTeleopRecorder(TeleopRecorder): - """Generic ``TeleopRecorder`` defaulting to the hosted recordings dir. - Ports + per-run timestamping are inherited; this only changes the default - output path. Compose at the CLI:: +# LiveKit twin of teleop_hosted_go2_transport — same channels, LiveKit SFU. +# Run with -o transports.broker.api_key=dtk_live_... +teleop_hosted_go2_livekit = ( + autoconnect( + unitree_go2_basic.disabled_modules(GO2Connection), + Go2HostedConnection.blueprint(), + ) + .transports( + { + ("cmd_vel", Twist): LiveKitTransport.spec("cmd_unreliable", TwistStamped), + ("color_image", Image): LiveKitVideoTransport.spec(), + ("state_json", bytes): LiveKitTransport.spec("state_reliable"), + ("telemetry_out", bytes): LiveKitTransport.spec("state_reliable_back"), + ("cmd_raw", bytes): LiveKitTransport.spec("cmd_unreliable"), # stats tap + ("cmd_vel_stamped", TwistStamped): LiveKitTransport.spec( + "cmd_unreliable", TwistStamped + ), + } + ) + .global_config(viewer="none") +) - dimos run teleop-hosted-xarm7 hosted-teleop-recorder - dimos run teleop-hosted-go2 hosted-teleop-recorder - """ - config: HostedTeleopRecorderConfig +# Adds a RealSense as cam2 (mux'd into the video track by Go2HostedConnection). +# Needs the RealSense wired in; use teleop-hosted-go2-transport otherwise. +teleop_hosted_go2_multicam = ( + autoconnect( + unitree_go2_basic.disabled_modules(GO2Connection), + Go2HostedConnection.blueprint(), + RealSenseCamera.blueprint(enable_depth=False, enable_pointcloud=False), + ) + .remappings([(RealSenseCamera, "color_image", "cam2_in")]) + .transports( + { + ("cmd_vel", Twist): CloudflareTransport.spec("cmd_unreliable", TwistStamped), + ("mux_image", Image): CloudflareVideoTransport.spec(), + ("cam2_in", Image): LCMTransport.spec("cam2_in", Image), + ("state_json", bytes): CloudflareTransport.spec("state_reliable"), + ("telemetry_out", bytes): CloudflareTransport.spec("state_reliable_back"), + ("cmd_raw", bytes): CloudflareTransport.spec("cmd_unreliable"), + ("cmd_vel_stamped", TwistStamped): CloudflareTransport.spec( + "cmd_unreliable", TwistStamped + ), + } + ) + .global_config(viewer="none") +) __all__ = [ - "HostedTeleopRecorder", - "HostedTeleopRecorderConfig", "teleop_hosted_go2", + "teleop_hosted_go2_livekit", + "teleop_hosted_go2_multicam", "teleop_hosted_go2_transport", "teleop_hosted_xarm7", ] diff --git a/dimos/teleop/quest_hosted/hosted_teleop_module.py b/dimos/teleop/quest_hosted/hosted_teleop_module.py index 5604dc2c47..df92668751 100644 --- a/dimos/teleop/quest_hosted/hosted_teleop_module.py +++ b/dimos/teleop/quest_hosted/hosted_teleop_module.py @@ -28,7 +28,6 @@ import asyncio from enum import IntEnum import json -import os import threading import time from typing import Any @@ -75,7 +74,7 @@ class HostedTeleopConfig(ModuleConfig): control_loop_hz: float = 50.0 broker_url: str = "https://teleop.dimensionalos.com" - # Empty defaults; resolved from TELEOP_* env vars at start() if unset. + # Set via the module-config flow (-o hosted-teleop.broker_api_key=... or env). broker_api_key: str = "" robot_id: str = "" robot_name: str = "" @@ -241,8 +240,8 @@ def _on_gathering() -> None: url = f"{self.config.broker_url.rstrip('/')}/api/v1/sessions" body = { - "robot_id": self.config.robot_id or os.getenv("TELEOP_ROBOT_ID", ""), - "robot_name": self.config.robot_name or os.getenv("TELEOP_ROBOT_NAME", ""), + "robot_id": self.config.robot_id, + "robot_name": self.config.robot_name, "sdp_offer": self._pc.localDescription.sdp, } resp = await self._http.post(url, json=body, headers=self._auth_headers()) @@ -287,9 +286,8 @@ async def _disconnect(self) -> None: self._session_id = None def _auth_headers(self) -> dict[str, str]: - api_key = self.config.broker_api_key or os.getenv("TELEOP_API_KEY") - if api_key: - return {"X-Robot-API-Key": api_key} + if self.config.broker_api_key: + return {"X-Robot-API-Key": self.config.broker_api_key} return {} def _start_heartbeat(self) -> None: diff --git a/dimos/teleop/utils/stream_stats.py b/dimos/teleop/utils/stream_stats.py index c10b1bb403..e1688dd825 100644 --- a/dimos/teleop/utils/stream_stats.py +++ b/dimos/teleop/utils/stream_stats.py @@ -14,17 +14,10 @@ """Stat helpers for teleop streams (latency / jitter / rate). -Two flavors live here: - -* **`pcts`** — a pure percentile helper shared by the post-hoc report writer - (``teleop/utils/report.py``) and any live stats consumer. -* **`LiveStreamStats`** — a rolling-window class for always-on consumers that - only need a recent snapshot (e.g. the operator HUD's command-plane telemetry). - -Packet loss / reorder are transport-layer concerns and are intentionally not -computed here from an application sequence number. TODO: surface command-plane -loss from datachannel/SCTP stats (same source as VideoStats.loss_pct), not a -per-message seq. +* **`pcts`** — percentile helper shared with the post-hoc report writer. +* **`LiveStreamStats`** — rolling window the robot measures over the inbound + command wire, then ships each ``snapshot()`` to the operator HUD (the robot + doesn't consume the stats locally — it's compute-and-forward). """ from __future__ import annotations @@ -51,41 +44,72 @@ def pcts(values: Sequence[float]) -> dict[str, float] | None: } +# Loss / reorder helpers — kept for when command loss gets wired (needs a +# send-count). Not used by snapshot() currently. +def loss_pct(seqs: Sequence[int]) -> float | None: + """Loss % from gaps in a monotonic sequence; None if fewer than 2 seqs. + + ``loss = 1 - distinct_received / (max_seq - min_seq + 1)``. Reorders and + duplicates don't inflate it — only genuinely missing seq values count. + Tail loss (packets after the last one seen) is invisible: we can only + measure gaps inside the observed ``[min, max]`` range. + """ + valid = [s for s in seqs if s is not None] + if len(valid) < 2: + return None + expected = max(valid) - min(valid) + 1 + received = len(set(valid)) + return max(0.0, (1.0 - received / expected) * 100.0) + + +def reorder_count(seqs: Sequence[int]) -> int: + """Count messages that arrived with a seq below an already-seen maximum.""" + count = 0 + running_max = -1 + for s in seqs: + if s is None: + continue + if s < running_max: + count += 1 + else: + running_max = s + return count + + class LiveStreamStats: - """Rolling-window health for an always-on stream consumer. + """Rolling-window health of an inbound stream, for forwarding to a remote HUD. - Records ``(wall, ts)`` per arrival in a bounded deque so old samples - fall off automatically; ``snapshot()`` returns the current window's median - E2E latency, median inter-arrival jitter, and arrival rate. - Thread-safe — ``record()`` runs on the transport callback, - ``snapshot()`` on a separate reader. + ``record()`` notes each arrival in a bounded deque; ``snapshot()`` returns + the window's median E2E latency, jitter, arrival rate, and throughput — + which the robot ships to the operator (it doesn't use them locally). + Thread-safe: ``record()`` on the transport callback, ``snapshot()`` on a + separate reader. """ def __init__(self, window: int = 120) -> None: self._lock = threading.Lock() - # (wall_arrival, ts); ts is None when the stream is unstamped. - self._samples: deque[tuple[float, float | None]] = deque(maxlen=window) + # (wall_arrival, ts, seq, nbytes); ts/seq/nbytes are None when absent. + self._samples: deque[tuple[float, float | None, int | None, int | None]] = deque( + maxlen=window + ) - def record(self, ts: float | None) -> None: - """Note an inbound message's send-stamp (None if unstamped).""" + def record(self, ts: float | None, seq: int | None = None, nbytes: int | None = None) -> None: + """Note an inbound message's send-stamp, seq, and wire size (any None).""" with self._lock: - self._samples.append((time.time(), ts)) + self._samples.append((time.time(), ts, seq, nbytes)) def snapshot(self) -> dict[str, float | None] | None: - """Median latency/jitter (ms), rate (Hz), or None. - - Returns ``None`` until at least two samples have landed (one inter-arrival - interval is needed). Uses the module's shared ``pcts`` so the math matches - the report writer. - """ + """Median latency/jitter (ms), rate (Hz), throughput. None until 2 samples.""" with self._lock: samples = list(self._samples) if len(samples) < 2: return None - arrivals = [w for w, _ in samples] + arrivals = [w for w, _, _, _ in samples] intervals_ms = [(b - a) * 1000.0 for a, b in pairwise(arrivals)] - e2e_ms = [(w - ts) * 1000.0 for w, ts in samples if ts is not None] + # `is not None` — ts=0.0 is a real value, only None means absent. + e2e_ms = [(w - ts) * 1000.0 for w, ts, _, _ in samples if ts is not None] + sizes = [n for _, _, _, n in samples if n is not None] e2e = pcts(e2e_ms) jit = pcts(intervals_ms) @@ -94,7 +118,8 @@ def snapshot(self) -> dict[str, float | None] | None: "latency_ms": e2e["p50"] if e2e else None, "jitter_ms": jit["p50"] if jit else None, "rate_hz": (len(samples) - 1) / span if span > 0 else None, + "throughput_bps": (sum(sizes) / span) if (sizes and span > 0) else None, } -__all__ = ["LiveStreamStats", "pcts"] +__all__ = ["LiveStreamStats", "loss_pct", "pcts", "reorder_count"] diff --git a/pyproject.toml b/pyproject.toml index 2749469a2f..48c83e1d56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -329,6 +329,12 @@ webrtc = [ "httpx>=0.27.0", ] +livekit = [ + # WebRTC pubsub over the hosted broker's LiveKit backend (robot rtc client). + "livekit>=1.0.0", + "httpx>=0.27.0", +] + base = [ "dimos[agents,web,perception,visualization]", ] @@ -533,6 +539,8 @@ module = [ "faster_whisper", "geometry_msgs.*", "lazy_loader", + "livekit", + "livekit.*", "mcap", "mcap.*", "mujoco", diff --git a/uv.lock b/uv.lock index 16adff1c99..07d11a7c47 100644 --- a/uv.lock +++ b/uv.lock @@ -2040,6 +2040,10 @@ dds = [ drone = [ { name = "pymavlink" }, ] +livekit = [ + { name = "httpx" }, + { name = "livekit" }, +] manipulation = [ { name = "a750-control", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, { name = "drake", version = "1.45.0", source = { registry = "https://pypi.org/simple" }, marker = "platform_machine != 'aarch64' and sys_platform == 'darwin'" }, @@ -2371,6 +2375,7 @@ requires-dist = [ { name = "gdown", marker = "extra == 'misc'", specifier = ">=5.2.2" }, { name = "googlemaps", marker = "extra == 'misc'", specifier = ">=4.10.0" }, { name = "gtsam-extended", marker = "extra == 'mapping'", specifier = ">=4.3a1.post1" }, + { name = "httpx", marker = "extra == 'livekit'", specifier = ">=0.27.0" }, { name = "httpx", marker = "extra == 'webrtc'", specifier = ">=0.27.0" }, { name = "hydra-core", marker = "extra == 'perception'", specifier = ">=1.3.0" }, { name = "ipykernel", marker = "extra == 'misc'" }, @@ -2386,6 +2391,7 @@ requires-dist = [ { name = "lap", marker = "extra == 'perception'", specifier = ">=0.5.12" }, { name = "lark", marker = "extra == 'misc'" }, { name = "lazy-loader" }, + { name = "livekit", marker = "extra == 'livekit'", specifier = ">=1.0.0" }, { name = "llvmlite", specifier = ">=0.42.0" }, { name = "lz4", specifier = ">=4.4.5" }, { name = "matplotlib", marker = "extra == 'manipulation'", specifier = ">=3.7.1" }, @@ -2463,7 +2469,7 @@ requires-dist = [ { name = "xformers", marker = "platform_machine == 'x86_64' and extra == 'cuda'", specifier = ">=0.0.20" }, { name = "yapf", marker = "extra == 'misc'", specifier = "==0.40.2" }, ] -provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "webrtc", "base", "apriltag", "all"] +provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "webrtc", "livekit", "base", "apriltag", "all"] [package.metadata.requires-dev] autofix = [{ name = "ruff", specifier = "==0.14.3" }] @@ -4925,6 +4931,26 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" }, ] +[[package]] +name = "livekit" +version = "1.1.10" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiofiles" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "protobuf" }, + { name = "types-protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/53/11/a8f7af0d9a0a1e705c98a16942f8ec70865a8a08280e3ad53a3026388d36/livekit-1.1.10.tar.gz", hash = "sha256:202101c49a1fbc1d771d5dfb884c77f42c01dafc411d12224b992fac78a843cb", size = 355789, upload-time = "2026-06-04T20:23:19.723Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/b0/51b2dc800ff2201da35ea87be1e25d370f4973e214e96ddf09563cd977a8/livekit-1.1.10-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:495b23988673bf6571fd0faaf621416e973fa1d302b1acccd479e0461cac924d", size = 10103039, upload-time = "2026-06-04T20:23:09.411Z" }, + { url = "https://files.pythonhosted.org/packages/f3/7b/d7e1af04915402f55d6aa2d063273d053f9da0799ea9ec0a691fc96003cf/livekit-1.1.10-py3-none-macosx_11_0_arm64.whl", hash = "sha256:7bd85dda5c8b11458b3447cbb02630af7bb267d424c65c89d2e6d736905147d1", size = 8933264, upload-time = "2026-06-04T20:23:11.527Z" }, + { url = "https://files.pythonhosted.org/packages/86/78/5ee7513df2ef124e5f75659bbe477253dd214a3089003e34644a37f80462/livekit-1.1.10-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:b05299fa0a4c98d8d57f0013367adad70a84dd5fc9c46e4a7f3df5352c81b6c1", size = 9938610, upload-time = "2026-06-04T20:23:13.495Z" }, + { url = "https://files.pythonhosted.org/packages/86/86/16364e82b7363ad43e6651e46dfccf88f18fe46897c2123fe4badbf1f067/livekit-1.1.10-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:29f58fe30dc181c45b0a9c929beaacacde0b2253fa6e597547508d5269e9012b", size = 11321977, upload-time = "2026-06-04T20:23:15.632Z" }, + { url = "https://files.pythonhosted.org/packages/80/ca/f50036fffbff113f8de6bd05194dc6df0a5f2028f058dcf69073f774a464/livekit-1.1.10-py3-none-win_amd64.whl", hash = "sha256:13ac0c8498e0e5bc41292526966fd6ad86baa66e1915778b128cf7e953b107fc", size = 10675561, upload-time = "2026-06-04T20:23:17.857Z" }, +] + [[package]] name = "llvmlite" version = "0.46.0" @@ -10723,6 +10749,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d5/91/9b286ab899c008c2cb05e8be99814807e7fbbd33f0c0c960470826e5ac82/typer-0.23.1-py3-none-any.whl", hash = "sha256:3291ad0d3c701cbf522012faccfbb29352ff16ad262db2139e6b01f15781f14e", size = 56813, upload-time = "2026-02-13T10:04:32.008Z" }, ] +[[package]] +name = "types-protobuf" +version = "7.34.1.20260518" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/29/59/e2b13b499d15e6720150c4b1a8d91e31fcacf716b432397475b3151ff7e4/types_protobuf-7.34.1.20260518.tar.gz", hash = "sha256:28cfaded25889cb83ebfb63cfb0a43628f0b6f3785767bec17287dc6468795f2", size = 68936, upload-time = "2026-05-18T06:01:47.332Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/1f/ec5caf72c2e3b688ca3927e0979a04ddad19e1afc4bf1c199bd743e0f419/types_protobuf-7.34.1.20260518-py3-none-any.whl", hash = "sha256:a0a5337413347166439c0e07cbc26c6164d091401c6f01b1dfd8cdb966c4dd8f", size = 85992, upload-time = "2026-05-18T06:01:45.696Z" }, +] + [[package]] name = "types-psycopg2" version = "2.9.21.20251012"