From 71c0d844f7b946522c697cda49d0fe2dbc8707f8 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Fri, 12 Jun 2026 21:12:05 -0700 Subject: [PATCH 01/17] feat(teleop): restore pong + command-plane telemetry on WebRTC transport path --- .../pubsub/impl/webrtc/providers/broker.py | 33 ++++ .../pubsub/impl/webrtc/test_broker_ping.py | 98 ++++++++++ dimos/robot/all_blueprints.py | 1 + dimos/teleop/quest_hosted/blueprints.py | 39 +++- dimos/teleop/quest_hosted/state_bridge.py | 177 ++++++++++++++++++ .../teleop/quest_hosted/test_state_bridge.py | 155 +++++++++++++++ dimos/teleop/utils/stream_stats.py | 78 ++++++-- 7 files changed, 558 insertions(+), 23 deletions(-) create mode 100644 dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py create mode 100644 dimos/teleop/quest_hosted/state_bridge.py create mode 100644 dimos/teleop/quest_hosted/test_state_bridge.py diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py index 86ec647b70..6938615f7b 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py @@ -47,7 +47,9 @@ from collections.abc import Callable import contextlib from dataclasses import dataclass +import json import os +import time from typing import TYPE_CHECKING, Any from dimos.protocol.pubsub.impl.webrtc.providers.sdp import propagate_bundle_candidates @@ -286,6 +288,8 @@ def _open_channel(self, name: str, sctp_id: int) -> None: def _on_msg(payload: Any) -> None: if isinstance(payload, str): payload = payload.encode() + if name == "state_reliable": + self._maybe_answer_ping(payload) with self._lock: callbacks = list(self._callbacks.get(name, ())) for cb in callbacks: @@ -304,6 +308,35 @@ def _close_channel(self, name: str) -> None: with contextlib.suppress(Exception): ch.close() + def _maybe_answer_ping(self, payload: bytes) -> None: + """Answer the web client's clock-sync ping inline on the loop thread. + + The operator measures RTT/offset from ping→pong timing, so the reply + must not ride a module hop (stream dispatch latency would inflate + every sample, and keep-latest mailboxes could drop pings outright). + The ping still fans out to subscribers afterwards — the provider stays + a transparent relay with this one reflex attached. + """ + if not payload.startswith(b"{"): + return # LCM binary or other non-JSON — not ours + try: + msg = json.loads(payload) + except ValueError: + return + if msg.get("type") != "ping" or msg.get("client_ts") is None: + return + pong = json.dumps( + {"type": "pong", "client_ts": msg["client_ts"], "robot_ts": time.time()} + ) + with self._lock: + ch = self._dcs.get("state_reliable_back") + # Pong MUST go on state_reliable_back — CF bridges one direction only; + # a robot send on state_reliable would be silently dropped. + if ch is not None and ch.readyState == "open": + ch.send(pong) + else: + logger.warning("ping received but state_reliable_back not open — pong dropped") + # ─── Public API (Provider) ─────────────────────────────────────── def publish(self, topic: str, data: bytes) -> None: diff --git a/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py b/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py new file mode 100644 index 0000000000..6741334275 --- /dev/null +++ b/dimos/protocol/pubsub/impl/webrtc/test_broker_ping.py @@ -0,0 +1,98 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for BrokerProvider's inline clock-sync ping responder. + +No network — a mocked ``state_reliable_back`` channel is injected and we +assert on what ``_maybe_answer_ping`` sends. The ping protocol itself +(ping → pong with echoed client_ts + fresh robot_ts) is what the web +client's RTT/offset estimator depends on. +""" + +from __future__ import annotations + +import json +import time +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from dimos.protocol.pubsub.impl.webrtc.providers.broker import BrokerConfig, BrokerProvider +from dimos.protocol.pubsub.impl.webrtc.providers.spec import WEBRTC_AVAILABLE + +# broker.py imports aiortc lazily, but BrokerProvider.__init__ pulls in the +# video track (aiortc/av) — so constructing one needs the extras installed. +pytestmark = pytest.mark.skipif(not WEBRTC_AVAILABLE, reason="aiortc not installed") + + +@pytest.fixture +def provider(monkeypatch: pytest.MonkeyPatch) -> BrokerProvider: + """BrokerProvider with a mocked open state_reliable_back channel.""" + monkeypatch.setenv("TELEOP_API_KEY", "dtk_test_key") + p = BrokerProvider(BrokerConfig()) + back = MagicMock() + back.readyState = "open" + p._dcs["state_reliable_back"] = back + return p + + +def _sent_payload(provider: BrokerProvider) -> dict[str, Any]: + ch: Any = provider._dcs["state_reliable_back"] + ch.send.assert_called_once() + return json.loads(ch.send.call_args[0][0]) # type: ignore[no-any-return] + + +def test_ping_echoes_pong_with_robot_ts(provider: BrokerProvider) -> None: + before = time.time() + provider._maybe_answer_ping(json.dumps({"type": "ping", "client_ts": 100.5}).encode()) + after = time.time() + + sent = _sent_payload(provider) + assert sent["type"] == "pong" + assert sent["client_ts"] == 100.5 + assert before <= sent["robot_ts"] <= after + + +def test_ping_missing_client_ts_dropped(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b'{"type":"ping"}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_non_json_binary_ignored(provider: BrokerProvider) -> None: + """LCM binary on the channel (future telemetry) must not be parsed.""" + provider._maybe_answer_ping(b"\x00\x01\x02\x03lcm-ish") + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_malformed_json_dropped(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b"{not json") + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_other_json_types_ignored(provider: BrokerProvider) -> None: + provider._maybe_answer_ping(b'{"type":"video_stats","fps":30}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_pong_dropped_when_back_channel_closed(provider: BrokerProvider) -> None: + provider._dcs["state_reliable_back"].readyState = "closed" # type: ignore[misc] + provider._maybe_answer_ping(b'{"type":"ping","client_ts":1.0}') + provider._dcs["state_reliable_back"].send.assert_not_called() # type: ignore[attr-defined] + + +def test_pong_dropped_when_back_channel_absent(provider: BrokerProvider) -> None: + del provider._dcs["state_reliable_back"] + # No channel at all — must not raise. + provider._maybe_answer_ping(b'{"type":"ping","client_ts":1.0}') diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 5dcb481f60..007c1baca4 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -225,6 +225,7 @@ "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", "tare-planner": "dimos.navigation.nav_stack.modules.tare_planner.tare_planner.TarePlanner", "teleop-recorder": "dimos.teleop.utils.recorder.TeleopRecorder", + "teleop-state-bridge": "dimos.teleop.quest_hosted.state_bridge.TeleopStateBridge", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", "terrain-analysis": "dimos.navigation.nav_stack.modules.terrain_analysis.terrain_analysis.TerrainAnalysis", "terrain-map-ext": "dimos.navigation.nav_stack.modules.terrain_map_ext.terrain_map_ext.TerrainMapExt", diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py index 9efd7da001..a0ccfcf251 100644 --- a/dimos/teleop/quest_hosted/blueprints.py +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -30,6 +30,7 @@ HostedArmTeleopModule, HostedTwistTeleopModule, ) +from dimos.teleop.quest_hosted.state_bridge import TeleopStateBridge from dimos.teleop.utils.recorder import TeleopRecorder, TeleopRecorderConfig # Single XArm7 teleop via the hosted (WebRTC) client. Pass `--simulation` to @@ -67,15 +68,41 @@ # provider/PeerConnection), and robot → operator telemetry can ride # CloudflareTransport("state_reliable_back", ...) the same way. # +# Clock-sync pings are answered inside BrokerProvider. TeleopStateBridge +# republishes operator video_stats as Out[VideoStats] (recorder picks it up) +# and pushes robot_telemetry (command-plane latency/jitter/loss) back to the +# operator HUD on state_reliable_back, measured from a raw tap on the command +# wire — no TwistStamped change. +# # Run: TELEOP_API_KEY=dtk_live_... dimos run teleop-hosted-go2-transport # (robot identity is derived from the key; TELEOP_ROBOT_ID optional) # then connect from https://teleop.dimensionalos.com (keyboard view). -teleop_hosted_go2_transport = unitree_go2_basic.transports( - { - ("cmd_vel", Twist): CloudflareTransport("cmd_unreliable", TwistStamped), - ("color_image", Image): CloudflareVideoTransport(), - } -).global_config(viewer="none") +teleop_hosted_go2_transport = ( + autoconnect( + unitree_go2_basic, + TeleopStateBridge.blueprint(), + ) + .transports( + { + ("cmd_vel", Twist): CloudflareTransport("cmd_unreliable", TwistStamped), + ("color_image", Image): CloudflareVideoTransport(), + ("state_json", bytes): CloudflareTransport("state_reliable"), + ("telemetry_out", bytes): CloudflareTransport("state_reliable_back"), + # Raw tap on the command wire for command-plane stats (reads the + # Header's stamp+seq off the bytes). Independent subscriber from + # cmd_vel above — same channel, separate decode. + ("cmd_raw", bytes): CloudflareTransport("cmd_unreliable"), + # Recorder tap on the command wire: when hosted-teleop-recorder is + # composed, its cmd_vel_stamped port subscribes to the SAME channel + # — independent typed decode, stamped with the browser's ts so the + # report's timing math works. Unused (harmless) when no recorder. + ("cmd_vel_stamped", TwistStamped): CloudflareTransport( + "cmd_unreliable", TwistStamped + ), + } + ) + .global_config(viewer="none") +) HOSTED_RECORDINGS_DIR = STATE_DIR / "hosted_teleop" / "recordings" diff --git a/dimos/teleop/quest_hosted/state_bridge.py b/dimos/teleop/quest_hosted/state_bridge.py new file mode 100644 index 0000000000..2f4a104091 --- /dev/null +++ b/dimos/teleop/quest_hosted/state_bridge.py @@ -0,0 +1,177 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Teleop control-plane bridge (transport-world). + +Two jobs, both translating between the operator's JSON control plane and DimOS: + +* **Inbound state plane** (``state_reliable`` JSON): ``video_stats`` → + typed ``Out[VideoStats]`` (recorders pick it up); ``clock_report`` logged. + Clock-sync ``ping`` is answered inline by ``BrokerProvider`` (lower latency + than a module hop), not here. + +* **Command-plane health** (robot → operator): a raw-bytes tap on + ``cmd_unreliable`` reads the wire ``Header`` (``stamp`` + ``seq`` + size — + no change to ``TwistStamped``) into a rolling ``LiveStreamStats`` window; + a timer publishes ``robot_telemetry`` JSON on ``state_reliable_back`` so the + operator HUD can show latency/jitter/loss the operator can't measure from + its own send side. + +Blueprint wiring:: + + autoconnect(..., TeleopStateBridge.blueprint()).transports({ + ("state_json", bytes): CloudflareTransport("state_reliable"), + ("cmd_raw", bytes): CloudflareTransport("cmd_unreliable"), + ("telemetry_out", bytes): CloudflareTransport("state_reliable_back"), + }) + +Together with the deprecated ``HostedTeleopModule``'s removal this restores +the full ``state_reliable``/``state_reliable_back`` parity in the transport +world. +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Any + +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import In, Out +from dimos.teleop.utils.stream_stats import LiveStreamStats +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class TeleopStateBridgeConfig(ModuleConfig): + telemetry_hz: float = 3.0 # robot → operator HUD command-plane stats + + +class TeleopStateBridge(Module): + """Operator JSON control plane ↔ typed DimOS streams + command-plane health.""" + + config: TeleopStateBridgeConfig + + # Inbound state plane (operator → robot), raw JSON bytes. + state_json: In[bytes] + # Raw command bytes (operator → robot) — stats tap only; the typed cmd_vel + # decode is a separate transport on the same channel. + cmd_raw: In[bytes] + # Republished operator video health (recorders subscribe). + video_stats: Out[VideoStats] + # robot → operator telemetry JSON (state_reliable_back). + telemetry_out: Out[bytes] + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._cmd_stats = LiveStreamStats() + self._telemetry_thread: threading.Thread | None = None + self._stop_event = threading.Event() + + @rpc + def start(self) -> None: + super().start() + self._stop_event.clear() + # Manual sync subscribes (not async handle_*): the keep-latest mailbox + # would drop messages when several land close together, and parsing / + # header-peeking is cheap enough for the transport callback. + for stream, cb in ((self.state_json, self._on_state_json), (self.cmd_raw, self._on_cmd_raw)): + unsub = stream.subscribe(cb) + self.register_disposable(Disposable(unsub)) + self._start_telemetry() + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._telemetry_thread is not None: + self._telemetry_thread.join(timeout=2.0) + self._telemetry_thread = None + super().stop() + + # ─── Inbound state plane ───────────────────────────────────────── + + def _on_state_json(self, data: Any) -> None: + if isinstance(data, str): + data = data.encode() + if not data.startswith(b"{"): + return # not JSON (future LCM telemetry on this channel) + try: + msg = json.loads(data) + except ValueError: + logger.warning("state_reliable: malformed JSON: %r", data[:80]) + return + + kind = msg.get("type") + if kind == "video_stats": + self.video_stats.publish(VideoStats.from_dict(msg)) + elif kind == "clock_report": + rtt = msg.get("rtt_ms") + off = msg.get("offset_ms") + logger.info( + "clock-sync: operator rtt=%.1fms offset=%.1fms", + float(rtt) if rtt is not None else float("nan"), + float(off) if off is not None else float("nan"), + ) + # ping is answered by BrokerProvider; anything else is a future + # control-plane message this version doesn't know — ignore. + + # ─── Command-plane health ──────────────────────────────────────── + + def _on_cmd_raw(self, data: Any) -> None: + """Peek the wire Header (stamp + seq) for command-plane stats. + + Reads what the operator already puts on the wire — no TwistStamped + change. ``ts`` is operator-clock-corrected (so recv minus ts = one-way + latency); ``seq`` drives loss/reorder. + """ + if isinstance(data, str): + data = data.encode() + try: + from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped + + lcm = LCMTwistStamped.lcm_decode(data) + ts = lcm.header.stamp.sec + lcm.header.stamp.nsec / 1_000_000_000 + seq = lcm.header.seq + except Exception: + return # foreign / undecodable frame on the channel — skip + self._cmd_stats.record(ts, seq=seq, nbytes=len(data)) + + def _start_telemetry(self) -> None: + def runner() -> None: + interval = 1.0 / max(self.config.telemetry_hz, 0.1) + while not self._stop_event.is_set(): + snap = self._cmd_stats.snapshot() + if snap is not None: + payload = json.dumps( + {"type": "robot_telemetry", "cmd": snap, "robot_ts": time.time()} + ) + try: + self.telemetry_out.publish(payload.encode()) + except Exception: + logger.debug("telemetry publish failed", exc_info=True) + self._stop_event.wait(interval) + + self._telemetry_thread = threading.Thread( + target=runner, daemon=True, name="TeleopStateBridgeTelemetry" + ) + self._telemetry_thread.start() + + +__all__ = ["TeleopStateBridge", "TeleopStateBridgeConfig"] diff --git a/dimos/teleop/quest_hosted/test_state_bridge.py b/dimos/teleop/quest_hosted/test_state_bridge.py new file mode 100644 index 0000000000..0c470a7e13 --- /dev/null +++ b/dimos/teleop/quest_hosted/test_state_bridge.py @@ -0,0 +1,155 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for TeleopStateBridge's JSON → typed-stream dispatch.""" + +from __future__ import annotations + +import json +import time +from typing import cast +from unittest.mock import MagicMock + +import pytest + +from dimos.teleop.quest_hosted.state_bridge import TeleopStateBridge +from dimos.teleop.utils.stream_stats import LiveStreamStats +from dimos.teleop.utils.video_stats import VideoStats + + +@pytest.fixture +def bridge() -> TeleopStateBridge: + """Bare bridge with mocked Out streams + a real stats accumulator.""" + b = TeleopStateBridge.__new__(TeleopStateBridge) + b.video_stats = MagicMock() # type: ignore[assignment] + b.telemetry_out = MagicMock() # type: ignore[assignment] + b._cmd_stats = LiveStreamStats() + return b + + +def _publish_mock(bridge: TeleopStateBridge) -> MagicMock: + return cast("MagicMock", bridge.video_stats).publish # type: ignore[no-any-return] + + +def _lcm_twist_bytes(ts: float, seq: int) -> bytes: + """Encode a TwistStamped on the wire the way the operator does — with seq + in the Header (which the dimos TwistStamped doesn't surface).""" + from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped + + m = LCMTwistStamped() + m.header.stamp.sec = int(ts) + m.header.stamp.nsec = int((ts - int(ts)) * 1_000_000_000) + m.header.frame_id = "keyboard" + m.header.seq = seq + return m.lcm_encode() # type: ignore[no-any-return] + + +def test_video_stats_republished_typed(bridge: TeleopStateBridge) -> None: + payload = { + "type": "video_stats", + "ts": 123.0, + "fps": 29.5, + "kbps": 2100.0, + "width": 1280, + "height": 720, + } + bridge._on_state_json(json.dumps(payload).encode()) + + _publish_mock(bridge).assert_called_once() + stats = _publish_mock(bridge).call_args[0][0] + assert isinstance(stats, VideoStats) + assert stats.fps == 29.5 + assert stats.width == 1280 + + +def test_str_payload_accepted(bridge: TeleopStateBridge) -> None: + """aiortc may deliver str on a DataChannel; both must work.""" + bridge._on_state_json('{"type":"video_stats","fps":15.0}') + assert _publish_mock(bridge).call_args[0][0].fps == 15.0 + + +def test_clock_report_logged_not_published(bridge: TeleopStateBridge) -> None: + bridge._on_state_json(b'{"type":"clock_report","rtt_ms":42.0,"offset_ms":-3.0}') + _publish_mock(bridge).assert_not_called() + + +def test_ping_ignored(bridge: TeleopStateBridge) -> None: + """Pings are the provider's job — the bridge must not react.""" + bridge._on_state_json(b'{"type":"ping","client_ts":1.0}') + _publish_mock(bridge).assert_not_called() + + +def test_unknown_type_ignored(bridge: TeleopStateBridge) -> None: + bridge._on_state_json(b'{"type":"mode_switch","mode":"arm"}') + _publish_mock(bridge).assert_not_called() + + +def test_non_json_binary_ignored(bridge: TeleopStateBridge) -> None: + bridge._on_state_json(b"\x00\x01lcm-ish") + _publish_mock(bridge).assert_not_called() + + +def test_malformed_json_dropped(bridge: TeleopStateBridge) -> None: + bridge._on_state_json(b"{not json") + _publish_mock(bridge).assert_not_called() + + +# ─── command-plane stats (cmd_raw tap) ────────────────────────────── + + +def test_cmd_raw_reads_wire_header(bridge: TeleopStateBridge) -> None: + """Header seq+stamp are read off the wire — no TwistStamped change.""" + now = time.time() + for i in range(5): + bridge._on_cmd_raw(_lcm_twist_bytes(now + i * 0.05, seq=i)) + snap = bridge._cmd_stats.snapshot() + assert snap is not None + assert snap["loss_pct"] == 0.0 # contiguous seqs + assert snap["rate_hz"] is not None + assert snap["throughput_bps"] is not None + + +def test_cmd_raw_loss_from_seq_gap(bridge: TeleopStateBridge) -> None: + now = time.time() + for seq in (0, 1, 3, 4, 5): # seq 2 dropped → 1/6 missing in [0,5] + bridge._on_cmd_raw(_lcm_twist_bytes(now + seq * 0.05, seq=seq)) + snap = bridge._cmd_stats.snapshot() + assert snap is not None + assert snap["loss_pct"] == pytest.approx(100.0 / 6.0, abs=0.1) + + +def test_cmd_raw_str_payload_accepted(bridge: TeleopStateBridge) -> None: + bridge._on_cmd_raw(_lcm_twist_bytes(time.time(), seq=0)) + bridge._on_cmd_raw(_lcm_twist_bytes(time.time(), seq=1)) + assert bridge._cmd_stats.snapshot() is not None + + +def test_cmd_raw_garbage_ignored(bridge: TeleopStateBridge) -> None: + """Undecodable frame must not raise or pollute the window.""" + bridge._on_cmd_raw(b"\xff\xfe not lcm") + assert bridge._cmd_stats.snapshot() is None # nothing recorded + + +def test_telemetry_payload_shape(bridge: TeleopStateBridge) -> None: + """robot_telemetry JSON matches what the web HUD parses.""" + now = time.time() + for i in range(4): + bridge._on_cmd_raw(_lcm_twist_bytes(now + i * 0.05, seq=i)) + snap = bridge._cmd_stats.snapshot() + assert snap is not None + payload = json.dumps({"type": "robot_telemetry", "cmd": snap}) + parsed = json.loads(payload) + assert parsed["type"] == "robot_telemetry" + assert "latency_ms" in parsed["cmd"] + assert "loss_pct" in parsed["cmd"] diff --git a/dimos/teleop/utils/stream_stats.py b/dimos/teleop/utils/stream_stats.py index c10b1bb403..99e561b0af 100644 --- a/dimos/teleop/utils/stream_stats.py +++ b/dimos/teleop/utils/stream_stats.py @@ -51,41 +51,82 @@ def pcts(values: Sequence[float]) -> dict[str, float] | None: } +def loss_pct(seqs: Sequence[int]) -> float | None: + """Loss % from gaps in a monotonic sequence; None if fewer than 2 seqs. + + ``loss = 1 - distinct_received / (max_seq - min_seq + 1)``. Reorders and + duplicates don't inflate it — only genuinely missing seq values count. + Tail loss (packets after the last one seen) is invisible: we can only + measure gaps inside the observed ``[min, max]`` range. + """ + valid = [s for s in seqs if s is not None] + if len(valid) < 2: + return None + expected = max(valid) - min(valid) + 1 + received = len(set(valid)) + return max(0.0, (1.0 - received / expected) * 100.0) + + +def reorder_count(seqs: Sequence[int]) -> int: + """Count messages that arrived with a seq below an already-seen maximum.""" + count = 0 + running_max = -1 + for s in seqs: + if s is None: + continue + if s < running_max: + count += 1 + else: + running_max = s + return count + + class LiveStreamStats: """Rolling-window health for an always-on stream consumer. - Records ``(wall, ts)`` per arrival in a bounded deque so old samples - fall off automatically; ``snapshot()`` returns the current window's median - E2E latency, median inter-arrival jitter, and arrival rate. - Thread-safe — ``record()`` runs on the transport callback, - ``snapshot()`` on a separate reader. + Records ``(wall, ts, seq, nbytes)`` per arrival in a bounded deque so old + samples fall off automatically; ``snapshot()`` returns the window's median + E2E latency, median inter-arrival jitter, seq-gap loss, reorder count, + arrival rate, and throughput. Thread-safe — ``record()`` runs on the + transport callback, ``snapshot()`` on a separate reader. + + ``seq`` enables loss/reorder (the sender's monotonic counter, read off the + wire); ``nbytes`` enables throughput. Both optional — unstamped streams + still get rate + jitter. """ def __init__(self, window: int = 120) -> None: self._lock = threading.Lock() - # (wall_arrival, ts); ts is None when the stream is unstamped. - self._samples: deque[tuple[float, float | None]] = deque(maxlen=window) - - def record(self, ts: float | None) -> None: - """Note an inbound message's send-stamp (None if unstamped).""" + # (wall_arrival, ts, seq, nbytes); ts/seq/nbytes are None when absent. + self._samples: deque[tuple[float, float | None, int | None, int | None]] = deque( + maxlen=window + ) + + def record( + self, ts: float | None, seq: int | None = None, nbytes: int | None = None + ) -> None: + """Note an inbound message's send-stamp, seq, and wire size (any None).""" with self._lock: - self._samples.append((time.time(), ts)) + self._samples.append((time.time(), ts, seq, nbytes)) def snapshot(self) -> dict[str, float | None] | None: - """Median latency/jitter (ms), rate (Hz), or None. + """Median latency/jitter (ms), loss (%), reorder, rate (Hz), throughput. Returns ``None`` until at least two samples have landed (one inter-arrival - interval is needed). Uses the module's shared ``pcts`` so the math matches - the report writer. + interval is needed). Uses the module's shared ``pcts``/``loss_pct`` so the + math matches the report writer. """ with self._lock: samples = list(self._samples) if len(samples) < 2: return None - arrivals = [w for w, _ in samples] + arrivals = [w for w, _, _, _ in samples] intervals_ms = [(b - a) * 1000.0 for a, b in pairwise(arrivals)] - e2e_ms = [(w - ts) * 1000.0 for w, ts in samples if ts is not None] + # `is not None` — ts=0.0 / seq=0 are real values, only None means absent. + e2e_ms = [(w - ts) * 1000.0 for w, ts, _, _ in samples if ts is not None] + seqs = [s for _, _, s, _ in samples if s is not None] + sizes = [n for _, _, _, n in samples if n is not None] e2e = pcts(e2e_ms) jit = pcts(intervals_ms) @@ -93,8 +134,11 @@ def snapshot(self) -> dict[str, float | None] | None: return { "latency_ms": e2e["p50"] if e2e else None, "jitter_ms": jit["p50"] if jit else None, + "loss_pct": loss_pct(seqs), + "reorder": float(reorder_count(seqs)) if seqs else None, "rate_hz": (len(samples) - 1) / span if span > 0 else None, + "throughput_bps": (sum(sizes) / span) if (sizes and span > 0) else None, } -__all__ = ["LiveStreamStats", "pcts"] +__all__ = ["LiveStreamStats", "loss_pct", "pcts", "reorder_count"] From ed26e8ecbf14586e5a48d609717dedc66179bf41 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Mon, 15 Jun 2026 17:15:07 -0700 Subject: [PATCH 02/17] initial commit: livekit support --- dimos/core/transport.py | 22 ++ .../impl/webrtc/providers/livekit_broker.py | 329 ++++++++++++++++++ dimos/robot/all_blueprints.py | 1 + dimos/robot/test_all_blueprints.py | 1 + dimos/teleop/quest_hosted/blueprints.py | 23 +- pyproject.toml | 6 + 6 files changed, 381 insertions(+), 1 deletion(-) create mode 100644 dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 2ecdb48645..a7c76db35f 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -36,6 +36,7 @@ from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory from dimos.protocol.pubsub.impl.webrtc.providers.broker import BrokerConfig +from dimos.protocol.pubsub.impl.webrtc.providers.livekit_broker import LiveKitBrokerConfig from dimos.protocol.pubsub.impl.webrtc.providers.spec import ProviderConfig from dimos.protocol.pubsub.impl.webrtc.webrtcpubsub import WebRTCPubSub from dimos.utils.logging_config import setup_logger @@ -445,6 +446,21 @@ class CloudflareTransport(WebRTCTransport[M]): _config_cls = BrokerConfig +class LiveKitTransport(WebRTCTransport[M]): + """WebRTC DataChannels via the hosted teleop broker + LiveKit SFU. + + Drop-in alternative to :class:`CloudflareTransport`; config kwargs flow into + :class:`LiveKitBrokerConfig` (unset fields fall back to ``TELEOP_*`` env). + + unitree_go2_livekit = unitree_go2_basic.transports({ + ("cmd_vel", Twist): LiveKitTransport("cmd_unreliable", TwistStamped), + ("color_image", Image): LiveKitVideoTransport(), + }) + """ + + _config_cls = LiveKitBrokerConfig + + class WebRTCVideoTransport(Transport[Any]): """Robot camera → remote viewer as a WebRTC video track (provider-agnostic). @@ -496,4 +512,10 @@ class CloudflareVideoTransport(WebRTCVideoTransport): _config_cls = BrokerConfig +class LiveKitVideoTransport(WebRTCVideoTransport): + """Camera → teleop web client via the hosted broker + LiveKit (see WebRTCVideoTransport).""" + + _config_cls = LiveKitBrokerConfig + + class ZenohTransport(PubSubTransport[T]): ... diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py new file mode 100644 index 0000000000..2773236566 --- /dev/null +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -0,0 +1,329 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Broker-mediated LiveKit provider (hosted teleop). + +The robot asks the ``dimensional-teleop`` broker for a LiveKit room + JWT +(``POST /api/v1/sessions {transport:"livekit"}`` → ``{url, token, room}``), +then connects straight to the LiveKit SFU. Unlike the Cloudflare ``broker.py`` +path there is no SDP relay, no SCTP-id juggling, and no heartbeat-driven +channel lifecycle: LiveKit data is bidirectional and topic-addressed, so a +single room carries every channel. + +Topics (kept identical to the Cloudflare path so the typed-fingerprint demux at +the transport layer is unchanged): + cmd_unreliable operator → robot commands (lossy) + state_reliable operator → robot control plane (reliable) + state_reliable_back robot → operator telemetry (reliable) + +Video: ``set_video_frame()`` pushes camera frames into a sendonly LiveKit track +(published lazily on the first frame) — typically via ``LiveKitVideoTransport`` +bound to a blueprint's Image stream. + +Env vars (fallback when config fields are unset): + TELEOP_BROKER_URL — default https://teleop.dimensionalos.com + TELEOP_API_KEY — robot API key (dtk_live_*); broker derives identity + TELEOP_ROBOT_ID — optional robot identifier override + TELEOP_ROBOT_NAME — human-readable robot name +""" + +from __future__ import annotations + +import asyncio +from collections import defaultdict +from collections.abc import Callable +import contextlib +from dataclasses import dataclass +import importlib.util +import os +from typing import TYPE_CHECKING, Any + +from dimos.protocol.pubsub.impl.webrtc.providers.spec import ( + AsyncProviderBase, + ProviderConfig, +) +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +# find_spec instead of importing: the livekit rtc SDK pulls native libs and +# core.transport imports this module everywhere. Imported lazily on start(). +LIVEKIT_AVAILABLE = ( + importlib.util.find_spec("livekit") is not None + and importlib.util.find_spec("httpx") is not None +) + +if TYPE_CHECKING: + import httpx + from livekit import rtc + + from dimos.msgs.sensor_msgs.Image import Image + + +@dataclass(frozen=True) +class LiveKitBrokerConfig(ProviderConfig): + """Hosted teleop over LiveKit. Credentials default from TELEOP_* env.""" + + broker_url: str | None = None + api_key: str | None = None + robot_id: str | None = None + robot_name: str | None = None + heartbeat_hz: float = 1.0 + + def _create(self) -> LiveKitBrokerProvider: + return LiveKitBrokerProvider(self) + + +def _image_to_rgba(img: Image) -> tuple[int, int, bytes]: + """Pack a dimos Image into (width, height, RGBA bytes) for a LiveKit frame.""" + import numpy as np + + from dimos.msgs.sensor_msgs.Image import ImageFormat + + arr = img.data + if arr.dtype != np.uint8: + arr = arr.astype(np.uint8) + h, w = arr.shape[:2] + fmt = img.format + if fmt == ImageFormat.RGBA: + rgba = arr + elif fmt == ImageFormat.BGRA: + rgba = arr[..., [2, 1, 0, 3]] + elif fmt == ImageFormat.RGB: + rgba = np.dstack([arr, np.full((h, w), 255, np.uint8)]) + elif fmt in (ImageFormat.GRAY, ImageFormat.GRAY16): + g = arr if arr.ndim == 2 else arr[..., 0] + rgba = np.dstack([g, g, g, np.full((h, w), 255, np.uint8)]) + else: # BGR and anything else: treat as BGR + rgba = np.dstack([arr[..., 2], arr[..., 1], arr[..., 0], np.full((h, w), 255, np.uint8)]) + return w, h, np.ascontiguousarray(rgba).tobytes() + + +class _VideoPublisher: + """Lazily-published sendonly LiveKit video track fed from an Image stream. + + Frames arrive from the producer thread; the source/track are created and the + track published on the first frame (so dimensions come from real data), all + marshalled onto the provider's loop thread where the room lives. + """ + + def __init__(self) -> None: + self._room: rtc.Room | None = None + self._loop: asyncio.AbstractEventLoop | None = None + self._source: rtc.VideoSource | None = None + + def bind(self, room: rtc.Room, loop: asyncio.AbstractEventLoop) -> None: + self._room = room + self._loop = loop + + def set_latest(self, img: Image) -> None: + loop = self._loop + if loop is None or not loop.is_running(): + return # not connected yet; pre-connect frames are dropped + try: + w, h, buf = _image_to_rgba(img) + except Exception: + logger.debug("video: frame conversion failed", exc_info=True) + return + loop.call_soon_threadsafe(self._capture, w, h, buf) + + def _capture(self, w: int, h: int, buf: bytes) -> None: + from livekit import rtc + + if self._source is None: + self._source = rtc.VideoSource(w, h) + asyncio.ensure_future(self._publish()) # publish the track once + frame = rtc.VideoFrame(w, h, rtc.VideoBufferType.RGBA, buf) + self._source.capture_frame(frame) + + async def _publish(self) -> None: + from livekit import rtc + + assert self._room is not None and self._source is not None + track = rtc.LocalVideoTrack.create_video_track("camera", self._source) + opts = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + await self._room.local_participant.publish_track(track, opts) + logger.info("LiveKit video track published") + + +class LiveKitBrokerProvider(AsyncProviderBase): + """Bidirectional broker-mediated LiveKit provider. + + Inbound (operator → robot): ``cmd_unreliable`` + ``state_reliable``, + delivered to subscribers by topic. Outbound (robot → operator): + ``publish()`` on any topic (LiveKit is bidirectional); ``cmd_unreliable`` + rides the lossy channel, everything else reliable. Together with + ``LiveKitTransport`` / ``LiveKitVideoTransport`` this is the LiveKit analog + of ``BrokerProvider``. + """ + + LOSSY_TOPICS = ("cmd_unreliable",) + + def __init__(self, config: LiveKitBrokerConfig | None = None) -> None: + if not LIVEKIT_AVAILABLE: + raise RuntimeError("livekit and httpx required: pip install dimos[livekit]") + super().__init__() + config = config or LiveKitBrokerConfig() + self._broker_url = ( + config.broker_url + or os.environ.get("TELEOP_BROKER_URL", "https://teleop.dimensionalos.com") + ).rstrip("/") + self._api_key = config.api_key or os.environ.get("TELEOP_API_KEY", "") + self._robot_id = config.robot_id or os.environ.get("TELEOP_ROBOT_ID", "") + self._robot_name = config.robot_name or os.environ.get("TELEOP_ROBOT_NAME", "robot") + if not self._api_key: + raise RuntimeError( + "TELEOP_API_KEY or LiveKitBrokerConfig.api_key required " + "(create one in the teleop dashboard: New Key)" + ) + self._config = config + + self._http: httpx.AsyncClient | None = None + self._room: rtc.Room | None = None + self.session_id: str | None = None + self.room: str | None = None + self._hb_task: asyncio.Task[None] | None = None + self._video = _VideoPublisher() + # topic → subscriber callbacks. Guarded by self._lock (from the base). + self._callbacks: dict[str, list[Callable[[bytes, str], None]]] = defaultdict(list) + + @property + def _headers(self) -> dict[str, str]: + return {"X-Robot-API-Key": self._api_key, "Content-Type": "application/json"} + + # ─── Connect / Disconnect (loop thread) ────────────────────────── + + async def _connect(self) -> None: + from livekit import rtc + import httpx + + self._http = httpx.AsyncClient(timeout=30.0) + r = await self._http.post( + f"{self._broker_url}/api/v1/sessions", + headers=self._headers, + json={ + "transport": "livekit", + "robot_name": self._robot_name, + **({"robot_id": self._robot_id} if self._robot_id else {}), + }, + ) + if r.status_code not in (200, 201): + raise RuntimeError(f"Broker session create failed: {r.status_code} {r.text[:500]}") + data = r.json() + self.session_id = data["session_id"] + self.room = data.get("room") + url, token = data["url"], data["token"] + + self._room = rtc.Room() + + @self._room.on("data_received") + def _on_data(packet: Any) -> None: + self._dispatch(packet) + + await self._room.connect(url, token) + self._video.bind(self._room, asyncio.get_running_loop()) + logger.info( + "LiveKit broker provider connected: session=%s room=%s robot=%s", + self.session_id, + data.get("room"), + self._robot_id or "(derived from API key)", + ) + self._hb_task = asyncio.get_running_loop().create_task(self._heartbeat_loop()) + + async def _disconnect(self) -> None: + if self._hb_task is not None: + self._hb_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._hb_task + self._hb_task = None + if self._http and self.session_id: + with contextlib.suppress(Exception): # best-effort deregistration + await self._http.delete( + f"{self._broker_url}/api/v1/sessions/{self.session_id}", + headers=self._headers, + ) + if self._room is not None: + with contextlib.suppress(Exception): + await self._room.disconnect() + self._room = None + if self._http: + await self._http.aclose() + self._http = None + self.session_id = None + + # ─── Heartbeat (loop thread; metrics/liveness only) ────────────── + + async def _heartbeat_loop(self) -> None: + interval = 1.0 / max(self._config.heartbeat_hz, 0.1) + while True: + try: + if self._http is not None and self.session_id is not None: + await self._http.post( + f"{self._broker_url}/api/v1/sessions/{self.session_id}/heartbeat", + headers=self._headers, + json={}, + ) + except Exception: + logger.debug("LiveKit heartbeat failed", exc_info=True) + await asyncio.sleep(interval) + + # ─── Dispatch (loop thread) ────────────────────────────────────── + + def _dispatch(self, packet: Any) -> None: + topic = getattr(packet, "topic", "") or "" + payload = getattr(packet, "data", b"") + if isinstance(payload, (bytearray, memoryview)): + payload = bytes(payload) + with self._lock: + callbacks = list(self._callbacks.get(topic, ())) + for cb in callbacks: + try: + cb(payload, topic) + except Exception: + logger.exception("LiveKit subscriber callback error") + + # ─── Public API (Provider) ─────────────────────────────────────── + + def publish(self, topic: str, data: bytes) -> None: + """Robot → operator on any topic (LiveKit is bidirectional). Messages + drop while no room/operator is connected — normal pubsub behaviour.""" + if isinstance(data, (bytearray, memoryview)): + data = bytes(data) + with self._lock: + if not self._started or self._loop is None or self._room is None: + return + room, loop = self._room, self._loop + reliable = topic not in self.LOSSY_TOPICS + coro = room.local_participant.publish_data(data, reliable=reliable, topic=topic) + asyncio.run_coroutine_threadsafe(coro, loop) + + def set_video_frame(self, img: Image) -> None: + """Robot → operator video: publish the latest camera frame (thread-safe).""" + self._video.set_latest(img) + + def subscribe(self, topic: str, callback: Callable[[bytes, str], None]) -> Callable[[], None]: + if not self.is_connected: + self.start() + with self._lock: + self._callbacks[topic].append(callback) + + def _unsub() -> None: + with self._lock: + with contextlib.suppress(ValueError): + self._callbacks[topic].remove(callback) + + return _unsub + + +__all__ = ["LiveKitBrokerConfig", "LiveKitBrokerProvider"] diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 5dcb481f60..23a1f949dc 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -73,6 +73,7 @@ "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", + "teleop-hosted-go2-livekit": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_livekit", "teleop-hosted-go2-transport": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_transport", "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", diff --git a/dimos/robot/test_all_blueprints.py b/dimos/robot/test_all_blueprints.py index a2c44a31bb..760be3e335 100644 --- a/dimos/robot/test_all_blueprints.py +++ b/dimos/robot/test_all_blueprints.py @@ -51,6 +51,7 @@ "coordinator-xarm7", "dual-xarm6-planner", "teleop-hosted-go2", + "teleop-hosted-go2-livekit", "teleop-hosted-go2-transport", "teleop-hosted-xarm7", "teleop-quest-dual", diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py index 9efd7da001..768b45da15 100644 --- a/dimos/teleop/quest_hosted/blueprints.py +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -19,7 +19,13 @@ from dimos.constants import STATE_DIR from dimos.control.blueprints.teleop import coordinator_teleop_xarm7 from dimos.core.coordination.blueprints import autoconnect -from dimos.core.transport import CloudflareTransport, CloudflareVideoTransport, LCMTransport +from dimos.core.transport import ( + CloudflareTransport, + CloudflareVideoTransport, + LCMTransport, + LiveKitTransport, + LiveKitVideoTransport, +) from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped @@ -78,6 +84,20 @@ ).global_config(viewer="none") +# Same transport-swap as above, over the hosted broker's LiveKit backend instead +# of Cloudflare. The robot picks the backend purely by which transport class the +# blueprint wires; the broker mints a LiveKit room+token from the same dtk_live_* +# key (see dimensional-teleop docs/livekit-spec.md). +# +# Run: TELEOP_API_KEY=dtk_live_... dimos run teleop-hosted-go2-livekit +teleop_hosted_go2_livekit = unitree_go2_basic.transports( + { + ("cmd_vel", Twist): LiveKitTransport("cmd_unreliable", TwistStamped), + ("color_image", Image): LiveKitVideoTransport(), + } +).global_config(viewer="none") + + HOSTED_RECORDINGS_DIR = STATE_DIR / "hosted_teleop" / "recordings" @@ -103,6 +123,7 @@ class HostedTeleopRecorder(TeleopRecorder): "HostedTeleopRecorder", "HostedTeleopRecorderConfig", "teleop_hosted_go2", + "teleop_hosted_go2_livekit", "teleop_hosted_go2_transport", "teleop_hosted_xarm7", ] diff --git a/pyproject.toml b/pyproject.toml index 2749469a2f..cd210bd9dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -329,6 +329,12 @@ webrtc = [ "httpx>=0.27.0", ] +livekit = [ + # WebRTC pubsub over the hosted broker's LiveKit backend (robot rtc client). + "livekit>=1.0.0", + "httpx>=0.27.0", +] + base = [ "dimos[agents,web,perception,visualization]", ] From 52ef8b4e94f269a3de23900f235b69e960b86ac9 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 16:28:17 -0700 Subject: [PATCH 03/17] fix: pre-commit fixes --- .../impl/webrtc/providers/livekit_broker.py | 5 ++- uv.lock | 37 ++++++++++++++++++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index 2773236566..c81ccb527f 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -122,6 +122,7 @@ def __init__(self) -> None: self._room: rtc.Room | None = None self._loop: asyncio.AbstractEventLoop | None = None self._source: rtc.VideoSource | None = None + self._publish_task: asyncio.Task | None = None def bind(self, room: rtc.Room, loop: asyncio.AbstractEventLoop) -> None: self._room = room @@ -143,7 +144,7 @@ def _capture(self, w: int, h: int, buf: bytes) -> None: if self._source is None: self._source = rtc.VideoSource(w, h) - asyncio.ensure_future(self._publish()) # publish the track once + self._publish_task = asyncio.ensure_future(self._publish()) frame = rtc.VideoFrame(w, h, rtc.VideoBufferType.RGBA, buf) self._source.capture_frame(frame) @@ -205,8 +206,8 @@ def _headers(self) -> dict[str, str]: # ─── Connect / Disconnect (loop thread) ────────────────────────── async def _connect(self) -> None: - from livekit import rtc import httpx + from livekit import rtc self._http = httpx.AsyncClient(timeout=30.0) r = await self._http.post( diff --git a/uv.lock b/uv.lock index 16adff1c99..07d11a7c47 100644 --- a/uv.lock +++ b/uv.lock @@ -2040,6 +2040,10 @@ dds = [ drone = [ { name = "pymavlink" }, ] +livekit = [ + { name = "httpx" }, + { name = "livekit" }, +] manipulation = [ { name = "a750-control", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, { name = "drake", version = "1.45.0", source = { registry = "https://pypi.org/simple" }, marker = "platform_machine != 'aarch64' and sys_platform == 'darwin'" }, @@ -2371,6 +2375,7 @@ requires-dist = [ { name = "gdown", marker = "extra == 'misc'", specifier = ">=5.2.2" }, { name = "googlemaps", marker = "extra == 'misc'", specifier = ">=4.10.0" }, { name = "gtsam-extended", marker = "extra == 'mapping'", specifier = ">=4.3a1.post1" }, + { name = "httpx", marker = "extra == 'livekit'", specifier = ">=0.27.0" }, { name = "httpx", marker = "extra == 'webrtc'", specifier = ">=0.27.0" }, { name = "hydra-core", marker = "extra == 'perception'", specifier = ">=1.3.0" }, { name = "ipykernel", marker = "extra == 'misc'" }, @@ -2386,6 +2391,7 @@ requires-dist = [ { name = "lap", marker = "extra == 'perception'", specifier = ">=0.5.12" }, { name = "lark", marker = "extra == 'misc'" }, { name = "lazy-loader" }, + { name = "livekit", marker = "extra == 'livekit'", specifier = ">=1.0.0" }, { name = "llvmlite", specifier = ">=0.42.0" }, { name = "lz4", specifier = ">=4.4.5" }, { name = "matplotlib", marker = "extra == 'manipulation'", specifier = ">=3.7.1" }, @@ -2463,7 +2469,7 @@ requires-dist = [ { name = "xformers", marker = "platform_machine == 'x86_64' and extra == 'cuda'", specifier = ">=0.0.20" }, { name = "yapf", marker = "extra == 'misc'", specifier = "==0.40.2" }, ] -provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "webrtc", "base", "apriltag", "all"] +provides-extras = ["misc", "visualization", "agents", "web", "perception", "unitree", "unitree-dds", "manipulation", "cpu", "cuda", "psql", "sim", "mapping", "drone", "dds", "webrtc", "livekit", "base", "apriltag", "all"] [package.metadata.requires-dev] autofix = [{ name = "ruff", specifier = "==0.14.3" }] @@ -4925,6 +4931,26 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" }, ] +[[package]] +name = "livekit" +version = "1.1.10" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiofiles" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "protobuf" }, + { name = "types-protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/53/11/a8f7af0d9a0a1e705c98a16942f8ec70865a8a08280e3ad53a3026388d36/livekit-1.1.10.tar.gz", hash = "sha256:202101c49a1fbc1d771d5dfb884c77f42c01dafc411d12224b992fac78a843cb", size = 355789, upload-time = "2026-06-04T20:23:19.723Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/b0/51b2dc800ff2201da35ea87be1e25d370f4973e214e96ddf09563cd977a8/livekit-1.1.10-py3-none-macosx_10_15_x86_64.whl", hash = "sha256:495b23988673bf6571fd0faaf621416e973fa1d302b1acccd479e0461cac924d", size = 10103039, upload-time = "2026-06-04T20:23:09.411Z" }, + { url = "https://files.pythonhosted.org/packages/f3/7b/d7e1af04915402f55d6aa2d063273d053f9da0799ea9ec0a691fc96003cf/livekit-1.1.10-py3-none-macosx_11_0_arm64.whl", hash = "sha256:7bd85dda5c8b11458b3447cbb02630af7bb267d424c65c89d2e6d736905147d1", size = 8933264, upload-time = "2026-06-04T20:23:11.527Z" }, + { url = "https://files.pythonhosted.org/packages/86/78/5ee7513df2ef124e5f75659bbe477253dd214a3089003e34644a37f80462/livekit-1.1.10-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:b05299fa0a4c98d8d57f0013367adad70a84dd5fc9c46e4a7f3df5352c81b6c1", size = 9938610, upload-time = "2026-06-04T20:23:13.495Z" }, + { url = "https://files.pythonhosted.org/packages/86/86/16364e82b7363ad43e6651e46dfccf88f18fe46897c2123fe4badbf1f067/livekit-1.1.10-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:29f58fe30dc181c45b0a9c929beaacacde0b2253fa6e597547508d5269e9012b", size = 11321977, upload-time = "2026-06-04T20:23:15.632Z" }, + { url = "https://files.pythonhosted.org/packages/80/ca/f50036fffbff113f8de6bd05194dc6df0a5f2028f058dcf69073f774a464/livekit-1.1.10-py3-none-win_amd64.whl", hash = "sha256:13ac0c8498e0e5bc41292526966fd6ad86baa66e1915778b128cf7e953b107fc", size = 10675561, upload-time = "2026-06-04T20:23:17.857Z" }, +] + [[package]] name = "llvmlite" version = "0.46.0" @@ -10723,6 +10749,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d5/91/9b286ab899c008c2cb05e8be99814807e7fbbd33f0c0c960470826e5ac82/typer-0.23.1-py3-none-any.whl", hash = "sha256:3291ad0d3c701cbf522012faccfbb29352ff16ad262db2139e6b01f15781f14e", size = 56813, upload-time = "2026-02-13T10:04:32.008Z" }, ] +[[package]] +name = "types-protobuf" +version = "7.34.1.20260518" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/29/59/e2b13b499d15e6720150c4b1a8d91e31fcacf716b432397475b3151ff7e4/types_protobuf-7.34.1.20260518.tar.gz", hash = "sha256:28cfaded25889cb83ebfb63cfb0a43628f0b6f3785767bec17287dc6468795f2", size = 68936, upload-time = "2026-05-18T06:01:47.332Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/1f/ec5caf72c2e3b688ca3927e0979a04ddad19e1afc4bf1c199bd743e0f419/types_protobuf-7.34.1.20260518-py3-none-any.whl", hash = "sha256:a0a5337413347166439c0e07cbc26c6164d091401c6f01b1dfd8cdb966c4dd8f", size = 85992, upload-time = "2026-05-18T06:01:45.696Z" }, +] + [[package]] name = "types-psycopg2" version = "2.9.21.20251012" From 76c4ab578a8cb894b59bca298526380073209f8b Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 17:19:11 -0700 Subject: [PATCH 04/17] fix: img normalization --- .../pubsub/impl/webrtc/providers/livekit_broker.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index c81ccb527f..6ba4151e05 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -92,7 +92,9 @@ def _image_to_rgba(img: Image) -> tuple[int, int, bytes]: from dimos.msgs.sensor_msgs.Image import ImageFormat arr = img.data - if arr.dtype != np.uint8: + if arr.dtype == np.uint16: + arr = (arr >> 8).astype(np.uint8) # scale 16-bit (e.g. GRAY16) to 8-bit, not truncate + elif arr.dtype != np.uint8: arr = arr.astype(np.uint8) h, w = arr.shape[:2] fmt = img.format @@ -301,13 +303,14 @@ def publish(self, topic: str, data: bytes) -> None: drop while no room/operator is connected — normal pubsub behaviour.""" if isinstance(data, (bytearray, memoryview)): data = bytes(data) + reliable = topic not in self.LOSSY_TOPICS with self._lock: if not self._started or self._loop is None or self._room is None: return - room, loop = self._room, self._loop - reliable = topic not in self.LOSSY_TOPICS - coro = room.local_participant.publish_data(data, reliable=reliable, topic=topic) - asyncio.run_coroutine_threadsafe(coro, loop) + coro = self._room.local_participant.publish_data( + data, reliable=reliable, topic=topic + ) + asyncio.run_coroutine_threadsafe(coro, self._loop) def set_video_frame(self, img: Image) -> None: """Robot → operator video: publish the latest camera frame (thread-safe).""" From a32048b2582446c47a7f0093ac027847f3e2a38b Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 17:19:35 -0700 Subject: [PATCH 05/17] fix: pre-commit --- dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index 6ba4151e05..a41ab053ed 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -307,9 +307,7 @@ def publish(self, topic: str, data: bytes) -> None: with self._lock: if not self._started or self._loop is None or self._room is None: return - coro = self._room.local_participant.publish_data( - data, reliable=reliable, topic=topic - ) + coro = self._room.local_participant.publish_data(data, reliable=reliable, topic=topic) asyncio.run_coroutine_threadsafe(coro, self._loop) def set_video_frame(self, img: Image) -> None: From 3a2edef8d3e8bd1e59a2d18801452f4586cbdee9 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 17:29:34 -0700 Subject: [PATCH 06/17] fix: mypy issues --- dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index a41ab053ed..ddd416177b 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -124,7 +124,7 @@ def __init__(self) -> None: self._room: rtc.Room | None = None self._loop: asyncio.AbstractEventLoop | None = None self._source: rtc.VideoSource | None = None - self._publish_task: asyncio.Task | None = None + self._publish_task: asyncio.Task[None] | None = None def bind(self, room: rtc.Room, loop: asyncio.AbstractEventLoop) -> None: self._room = room From a07d71ddb0cfd09c9cdab0b8981b89d219e67ffc Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 17:37:48 -0700 Subject: [PATCH 07/17] fix: mypy errors --- dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py | 2 +- pyproject.toml | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index ddd416177b..8d86cff4cd 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -230,7 +230,7 @@ async def _connect(self) -> None: self._room = rtc.Room() - @self._room.on("data_received") + @self._room.on("data_received") # type: ignore[untyped-decorator] def _on_data(packet: Any) -> None: self._dispatch(packet) diff --git a/pyproject.toml b/pyproject.toml index cd210bd9dd..48c83e1d56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -539,6 +539,8 @@ module = [ "faster_whisper", "geometry_msgs.*", "lazy_loader", + "livekit", + "livekit.*", "mcap", "mcap.*", "mujoco", From 4f751899d223e26055da76d84318013239883860 Mon Sep 17 00:00:00 2001 From: ruthwikdasyam Date: Tue, 16 Jun 2026 17:49:47 -0700 Subject: [PATCH 08/17] fix(webrtc): reset LiveKit video publisher on reconnect and publish failure; surface heartbeat errors --- .../impl/webrtc/providers/livekit_broker.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index 8d86cff4cd..5abfe8ce13 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -130,6 +130,14 @@ def bind(self, room: rtc.Room, loop: asyncio.AbstractEventLoop) -> None: self._room = room self._loop = loop + def reset(self) -> None: + """Drop per-session state so a later bind() (reconnect) re-publishes the + track on the new room. Called from the provider's _disconnect().""" + self._room = None + self._loop = None + self._source = None + self._publish_task = None + def set_latest(self, img: Image) -> None: loop = self._loop if loop is None or not loop.is_running(): @@ -154,9 +162,17 @@ async def _publish(self) -> None: from livekit import rtc assert self._room is not None and self._source is not None - track = rtc.LocalVideoTrack.create_video_track("camera", self._source) - opts = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) - await self._room.local_participant.publish_track(track, opts) + try: + track = rtc.LocalVideoTrack.create_video_track("camera", self._source) + opts = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + await self._room.local_participant.publish_track(track, opts) + except Exception: + # Clear _source so the next captured frame retries publish, instead + # of feeding frames forever into a never-published source. + logger.warning("LiveKit video track publish failed; will retry", exc_info=True) + self._source = None + self._publish_task = None + return logger.info("LiveKit video track published") @@ -260,6 +276,7 @@ async def _disconnect(self) -> None: with contextlib.suppress(Exception): await self._room.disconnect() self._room = None + self._video.reset() # clear per-session video state so a restart re-publishes if self._http: await self._http.aclose() self._http = None @@ -278,7 +295,7 @@ async def _heartbeat_loop(self) -> None: json={}, ) except Exception: - logger.debug("LiveKit heartbeat failed", exc_info=True) + logger.warning("LiveKit heartbeat failed", exc_info=True) await asyncio.sleep(interval) # ─── Dispatch (loop thread) ────────────────────────────────────── From d2a074c76aa67e096e2c54988135821bd558ce0b Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Mon, 22 Jun 2026 12:47:20 -0700 Subject: [PATCH 09/17] feat(teleop): hosted Go2 one-session module + robot commands, multicam, battery --- dimos/robot/all_blueprints.py | 3 +- dimos/robot/unitree/connection.py | 30 +- dimos/robot/unitree/go2/connection.py | 21 ++ dimos/teleop/quest_hosted/blueprints.py | 94 +++--- .../quest_hosted/go2_hosted_connection.py | 284 ++++++++++++++++++ dimos/teleop/quest_hosted/state_bridge.py | 177 ----------- .../teleop/quest_hosted/test_state_bridge.py | 155 ---------- dimos/teleop/utils/stream_stats.py | 45 +-- 8 files changed, 378 insertions(+), 431 deletions(-) create mode 100644 dimos/teleop/quest_hosted/go2_hosted_connection.py delete mode 100644 dimos/teleop/quest_hosted/state_bridge.py delete mode 100644 dimos/teleop/quest_hosted/test_state_bridge.py diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 007c1baca4..6528cf079b 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -74,6 +74,7 @@ "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", "teleop-hosted-go2-transport": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_transport", + "teleop-hosted-go2-multicam": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2_multicam", "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", @@ -171,7 +172,6 @@ "gstreamer-camera-module": "dimos.hardware.sensors.camera.gstreamer.gstreamer_camera.GstreamerCameraModule", "hosted-arm-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedArmTeleopModule", "hosted-teleop-module": "dimos.teleop.quest_hosted.hosted_teleop_module.HostedTeleopModule", - "hosted-teleop-recorder": "dimos.teleop.quest_hosted.blueprints.HostedTeleopRecorder", "hosted-twist-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedTwistTeleopModule", "joint-trajectory-controller": "dimos.manipulation.control.trajectory_controller.joint_trajectory_controller.JointTrajectoryController", "joystick-module": "dimos.robot.unitree.b1.joystick_module.JoystickModule", @@ -225,7 +225,6 @@ "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", "tare-planner": "dimos.navigation.nav_stack.modules.tare_planner.tare_planner.TarePlanner", "teleop-recorder": "dimos.teleop.utils.recorder.TeleopRecorder", - "teleop-state-bridge": "dimos.teleop.quest_hosted.state_bridge.TeleopStateBridge", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", "terrain-analysis": "dimos.navigation.nav_stack.modules.terrain_analysis.terrain_analysis.TerrainAnalysis", "terrain-map-ext": "dimos.navigation.nav_stack.modules.terrain_map_ext.terrain_map_ext.TerrainMapExt", diff --git a/dimos/robot/unitree/connection.py b/dimos/robot/unitree/connection.py index 44101cc19d..cbc83ca3d7 100644 --- a/dimos/robot/unitree/connection.py +++ b/dimos/robot/unitree/connection.py @@ -314,6 +314,10 @@ def balance_stand(self) -> bool: self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": SPORT_CMD["BalanceStand"]}) ) + def sport_command(self, api_id: int) -> bool: + """Send a parameterless SPORT_MOD command by api_id (Hello, Stretch, ...).""" + return bool(self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": api_id})) + def set_obstacle_avoidance(self, enabled: bool = True) -> None: self.publish_request( RTC_TOPIC["OBSTACLES_AVOID"], @@ -328,21 +332,33 @@ def enable_rage_mode(self) -> bool: """Enable Rage Mode on the Go2 via WebRTC. Assumes the robot is already in BalanceStand. """ + return self.set_rage_mode(True) + + def set_rage_mode(self, enable: bool) -> bool: + """Toggle Rage Mode (api 2059) over WebRTC, both directions. + + Mirrors the DDS adapter recipe: BalanceStand → 2059 {data:enable} → + on enable, settle + SwitchJoystick(True); on disable, SwitchJoystick(False) + to return to the normal velocity envelope. After enable, normal move() + twists drive at the ~2.5 m/s rage envelope. + """ + # Re-establish BalanceStand before toggling (notes: always BalanceStand + # before flipping Rage). + self.balance_stand() + time.sleep(0.3) + rage_ok = bool( self.publish_request( RTC_TOPIC["SPORT_MOD"], - {"api_id": self._SPORT_API_ID_RAGEMODE, "parameter": {"data": True}}, + {"api_id": self._SPORT_API_ID_RAGEMODE, "parameter": {"data": enable}}, ) ) - time.sleep(2.0) - + if enable: + time.sleep(2.0) # let FsmRageMode transition settle joystick_ok = bool( self.publish_request( RTC_TOPIC["SPORT_MOD"], - { - "api_id": SPORT_CMD["SwitchJoystick"], - "parameter": {"data": True}, - }, + {"api_id": SPORT_CMD["SwitchJoystick"], "parameter": {"data": enable}}, ) ) return rage_ok and joystick_ok diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 5568a473ef..009ff26440 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -81,6 +81,7 @@ def move(self, twist: Twist, duration: float = 0.0) -> bool: ... def standup(self) -> bool: ... def liedown(self) -> bool: ... def balance_stand(self) -> bool: ... + def sport_command(self, api_id: int) -> bool: ... def set_obstacle_avoidance(self, enabled: bool = True) -> None: ... def enable_rage_mode(self) -> bool: ... def publish_request(self, topic: str, data: dict) -> dict: ... # type: ignore[type-arg] @@ -168,6 +169,9 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass @@ -245,6 +249,9 @@ def onimage(image: Image) -> None: self.register_disposable(self.connection.odom_stream().subscribe(self._publish_tf)) self.register_disposable(self.connection.video_stream().subscribe(onimage)) self.register_disposable(Disposable(self.cmd_vel.subscribe(self.move))) + self.register_disposable( + self.connection.lowstate_stream().subscribe(self._on_lowstate) + ) self._camera_info_thread = Thread( target=self.publish_camera_info, @@ -330,6 +337,20 @@ def balance_stand(self) -> bool: """Enter BalanceStand: neutral state for switching locomotion modes""" return self.connection.balance_stand() + def _on_lowstate(self, msg: Any) -> None: + """Cache battery SOC from the lowstate push stream (bms_state.soc, %).""" + try: + self._latest_soc = int(msg["data"]["bms_state"]["soc"]) + except (KeyError, TypeError, ValueError): + if not getattr(self, "_soc_parse_warned", False): + self._soc_parse_warned = True + logger.warning("lowstate: could not read bms_state.soc — battery unavailable") + + @rpc + def get_battery_soc(self) -> int | None: + """Latest battery state-of-charge (0-100%), or None until first lowstate.""" + return getattr(self, "_latest_soc", None) + @rpc def enable_rage_mode(self) -> bool: """Enable Rage Mode (~2.5 m/s forward velocity envelope). diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py index 369234aef3..9fedc2c0e2 100644 --- a/dimos/teleop/quest_hosted/blueprints.py +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -14,27 +14,24 @@ """Hosted teleop blueprints (WebRTC transport).""" -from pathlib import Path - -from dimos.constants import STATE_DIR from dimos.control.blueprints.teleop import coordinator_teleop_xarm7 from dimos.core.coordination.blueprints import autoconnect from dimos.core.transport import CloudflareTransport, CloudflareVideoTransport, LCMTransport +from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped from dimos.msgs.sensor_msgs.Image import Image from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic +from dimos.robot.unitree.go2.connection import GO2Connection from dimos.teleop.quest.quest_types import Buttons +from dimos.teleop.quest_hosted.go2_hosted_connection import Go2HostedConnection from dimos.teleop.quest_hosted.hosted_extensions import ( HostedArmTeleopModule, HostedTwistTeleopModule, ) -from dimos.teleop.quest_hosted.state_bridge import TeleopStateBridge -from dimos.teleop.utils.recorder import TeleopRecorder, TeleopRecorderConfig -# Single XArm7 teleop via the hosted (WebRTC) client. Pass `--simulation` to -# run the coordinator inside MuJoCo, omit it for real hardware. +# Single XArm7 hosted teleop. Pass `--simulation` to run in MuJoCo. teleop_hosted_xarm7 = ( autoconnect( HostedArmTeleopModule.blueprint(task_names={"right": "teleop_xarm"}), @@ -52,36 +49,18 @@ ) -# viewer="none" drops the rerun window (operator gets video over WebRTC, so the -# robot-side rerun view is unwanted here). +# Hosted teleop via the legacy module wrapper (the transport swap is preferred). teleop_hosted_go2 = autoconnect( HostedTwistTeleopModule.blueprint(), unitree_go2_basic, ).global_config(n_workers=8, viewer="none") -# Hosted teleop as a pure transport swap — no teleop module wrapper. The -# browser's keyboard/VR view sends LCM TwistStamped on cmd_unreliable; the -# transport decodes it straight onto the go2 cmd_vel stream (commands arrive -# as sent: normalized [-1, 1], no speed rescaling). The camera stream feeds -# the session's WebRTC video track via CloudflareVideoTransport (same -# provider/PeerConnection), and robot → operator telemetry can ride -# CloudflareTransport.spec("state_reliable_back", ...) the same way. -# -# Clock-sync pings are answered inside BrokerProvider. TeleopStateBridge -# republishes operator video_stats as Out[VideoStats] (recorder picks it up) -# and pushes robot_telemetry (command-plane latency/jitter/loss) back to the -# operator HUD on state_reliable_back, measured from a raw tap on the command -# wire — no TwistStamped change. -# -# Run: dimos run teleop-hosted-go2-transport -o transports.broker.api_key=dtk_live_... -# (or TRANSPORTS__BROKER__API_KEY=dtk_live_... in env; robot identity is -# derived from the key, override with transports.broker.robot_id if needed) -# then connect from https://teleop.dimensionalos.com (keyboard view). +# Hosted teleop over CF Realtime. Run with -o transports.broker.api_key=dtk_live_... teleop_hosted_go2_transport = ( autoconnect( - unitree_go2_basic, - TeleopStateBridge.blueprint(), + unitree_go2_basic.disabled_modules(GO2Connection), + Go2HostedConnection.blueprint(), ) .transports( { @@ -89,48 +68,45 @@ ("color_image", Image): CloudflareVideoTransport.spec(), ("state_json", bytes): CloudflareTransport.spec("state_reliable"), ("telemetry_out", bytes): CloudflareTransport.spec("state_reliable_back"), - # Raw tap on the command wire for command-plane stats (reads the - # Header's stamp+seq off the bytes). Independent subscriber from - # cmd_vel above — same channel, separate decode. - ("cmd_raw", bytes): CloudflareTransport.spec("cmd_unreliable"), - # Recorder tap on the command wire: when hosted-teleop-recorder is - # composed, its cmd_vel_stamped port subscribes to the SAME channel - # — independent typed decode, stamped with the browser's ts so the - # report's timing math works. Unused (harmless) when no recorder. + ("cmd_raw", bytes): CloudflareTransport.spec("cmd_unreliable"), # stats tap ("cmd_vel_stamped", TwistStamped): CloudflareTransport.spec( "cmd_unreliable", TwistStamped - ), + ), # recorder tap } ) .global_config(viewer="none") ) -HOSTED_RECORDINGS_DIR = STATE_DIR / "hosted_teleop" / "recordings" - - -class HostedTeleopRecorderConfig(TeleopRecorderConfig): - # Same generic recorder, just defaulting recordings into the hosted dir. - db_path: str | Path = HOSTED_RECORDINGS_DIR / "recording_hosted.db" - - -class HostedTeleopRecorder(TeleopRecorder): - """Generic ``TeleopRecorder`` defaulting to the hosted recordings dir. - - Ports + per-run timestamping are inherited; this only changes the default - output path. Compose at the CLI:: - - dimos run teleop-hosted-xarm7 hosted-teleop-recorder - dimos run teleop-hosted-go2 hosted-teleop-recorder - """ - - config: HostedTeleopRecorderConfig +# Adds a RealSense as cam2 (mux'd into the video track by Go2HostedConnection). +# Needs the RealSense wired in; use teleop-hosted-go2-transport otherwise. +teleop_hosted_go2_multicam = ( + autoconnect( + unitree_go2_basic.disabled_modules(GO2Connection), + Go2HostedConnection.blueprint(), + RealSenseCamera.blueprint(enable_depth=False, enable_pointcloud=False), + ) + .remappings([(RealSenseCamera, "color_image", "cam2_in")]) + .transports( + { + ("cmd_vel", Twist): CloudflareTransport.spec("cmd_unreliable", TwistStamped), + ("mux_image", Image): CloudflareVideoTransport.spec(), + ("cam2_in", Image): LCMTransport.spec("cam2_in", Image), + ("state_json", bytes): CloudflareTransport.spec("state_reliable"), + ("telemetry_out", bytes): CloudflareTransport.spec("state_reliable_back"), + ("cmd_raw", bytes): CloudflareTransport.spec("cmd_unreliable"), + ("cmd_vel_stamped", TwistStamped): CloudflareTransport.spec( + "cmd_unreliable", TwistStamped + ), + } + ) + .global_config(viewer="none") +) __all__ = [ - "HostedTeleopRecorder", - "HostedTeleopRecorderConfig", "teleop_hosted_go2", + "teleop_hosted_go2_multicam", "teleop_hosted_go2_transport", "teleop_hosted_xarm7", ] diff --git a/dimos/teleop/quest_hosted/go2_hosted_connection.py b/dimos/teleop/quest_hosted/go2_hosted_connection.py new file mode 100644 index 0000000000..e8b07a1bcf --- /dev/null +++ b/dimos/teleop/quest_hosted/go2_hosted_connection.py @@ -0,0 +1,284 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 driver + hosted-teleop control plane in ONE module. + +The broker provider is a per-process singleton, and ``GO2Connection`` is +``dedicated_worker=True`` (its own process), so all hosted broker transports +(cmd, video, state, state_back) must live on this one module to share a single +CF session — a separate bridge module lands in another worker = a 2nd session +the operator can't see. Opt-in subclass; plain ``GO2Connection`` is unchanged. +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Any + +import numpy as np +from reactivex.disposable import Disposable + +from dimos.core.core import rpc +from dimos.core.stream import In, Out +from dimos.msgs.sensor_msgs.Image import Image +from dimos.robot.unitree.go2.connection import ConnectionConfig, GO2Connection +from dimos.teleop.utils.stream_stats import LiveStreamStats +from dimos.teleop.utils.video_stats import VideoStats +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +# Operator-allowed sport commands → SPORT_CMD api_id (robot-side allow-list). +ALLOWED_SPORT_CMDS: dict[str, int] = { + "StandDown": 1005, + "RecoveryStand": 1006, + "Sit": 1009, + "Hello": 1016, + "Stretch": 1017, + "Damp": 1001, + "FrontPounce": 1032, # acrobatic — leaps + "FrontJump": 1031, # acrobatic — leaps +} + + +class Go2HostedConnectionConfig(ConnectionConfig): + telemetry_hz: float = 3.0 # robot → operator HUD telemetry push rate + + +class Go2HostedConnection(GO2Connection): + """GO2Connection + the hosted-teleop state plane, colocated (one session).""" + + config: Go2HostedConnectionConfig + + state_json: In[bytes] # operator → robot control JSON (state_reliable) + cmd_raw: In[bytes] # operator → robot command bytes (stats tap) + video_stats: Out[VideoStats] # operator video health, for recorders + telemetry_out: Out[bytes] # robot → operator telemetry + acks (state_reliable_back) + cam2_in: In[Image] # extra camera (RealSense) for the mux + mux_image: Out[Image] # composited cam1(Go2)+cam2 → video transport + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._cmd_stats = LiveStreamStats() + self._telemetry_thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._rage_active = False # tracks firmware Rage Mode (speed bar) + self._cam_lock = threading.Lock() + self._cam_frames: dict[str, Image] = {} # "cam1"/"cam2" → latest frame + self._cam_selected = ["cam1"] # operator tab selection + + @rpc + def start(self) -> None: + super().start() + self._stop_event.clear() + # Sync subscribes (not async handle_*): keep-latest would drop bursts. + for stream, cb in ((self.state_json, self._on_state_json), (self.cmd_raw, self._on_cmd_raw)): + self.register_disposable(Disposable(stream.subscribe(cb))) + # Mux: tap the base's color_image as cam1, RealSense as cam2 → mux_image. + self.register_disposable(Disposable(self.color_image.subscribe(lambda i: self._on_cam("cam1", i)))) + self.register_disposable(Disposable(self.cam2_in.subscribe(lambda i: self._on_cam("cam2", i)))) + self._start_telemetry() + + # ─── Camera mux ────────────────────────────────────────────────── + def _on_cam(self, cam: str, img: Image) -> None: + with self._cam_lock: + self._cam_frames[cam] = img + shown = cam in self._cam_selected + if shown: + out = self._composite() + if out is not None: + self.mux_image.publish(out) + + def _composite(self) -> Image | None: + with self._cam_lock: + order = [c for c in ("cam1", "cam2") if c in self._cam_selected] + imgs = [self._cam_frames[c] for c in order if c in self._cam_frames] + if not imgs: + return None + if len(imgs) == 1: + return imgs[0] + import cv2 + + target_h = min(im.data.shape[0] for im in imgs) + tiles = [] + for im in imgs: + h, w = im.data.shape[:2] + tiles.append(cv2.resize(im.data, (int(w * target_h / h), target_h)) if h != target_h else im.data) + return Image(data=np.hstack(tiles), format=imgs[0].format, frame_id="camera_mux") + + def _set_cam_selection(self, cams: list[str]) -> None: + sel = [c for c in cams if c in ("cam1", "cam2")] or ["cam1"] + with self._cam_lock: + self._cam_selected = sel + logger.info("camera selection → %s", sel) + out = self._composite() + if out is not None: + self.mux_image.publish(out) + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._telemetry_thread is not None: + self._telemetry_thread.join(timeout=2.0) + self._telemetry_thread = None + super().stop() + + # ─── Inbound state plane (operator → robot) ────────────────────── + + def _on_state_json(self, data: Any) -> None: + if isinstance(data, str): + data = data.encode() + if not data.startswith(b"{"): + return # not JSON + try: + msg = json.loads(data) + except ValueError: + logger.warning("state_reliable: malformed JSON: %r", data[:80]) + return + + kind = msg.get("type") + if kind == "sport_cmd": + self._handle_sport_cmd(msg) + elif kind == "set_mode": + self._handle_set_mode(msg) + elif kind == "camera_select": + self._set_cam_selection(msg.get("cams", [])) + elif kind == "video_stats": + self.video_stats.publish(VideoStats.from_dict(msg)) + elif kind == "clock_report": + logger.info( + "clock-sync: operator rtt=%s offset=%s", + msg.get("rtt_ms"), msg.get("offset_ms"), + ) + # ping answered by BrokerProvider; unknown types ignored. + + def _handle_sport_cmd(self, msg: dict[str, Any]) -> None: + """Operator button → allow-listed SPORT_MOD request, ack on cmd_ack.""" + name = msg.get("name") + nonce = msg.get("nonce") + + # StandReady is the standup+balance combo, never the two separately. + if name == "StandReady": + self._stand_ready(nonce) + return + + api_id = ALLOWED_SPORT_CMDS.get(name) if isinstance(name, str) else None + if api_id is None: + logger.warning("sport_cmd: disallowed/unknown name %r", name) + self._send_ack(nonce, False) + return + + # sport_command blocks the WebRTC loop (= the video loop) — run off the + # callback so a gesture like Hello doesn't stall video. + def runner() -> None: + ok = False + try: + ok = bool(self.connection.sport_command(api_id)) + except Exception: + logger.exception("sport_cmd %s failed", name) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name=f"Go2SportCmd-{name}").start() + + def _stand_ready(self, nonce: Any) -> None: + """Standup → settle → BalanceStand (drive-ready). Acks when balanced.""" + + def runner() -> None: + ok = False + try: + self.connection.standup() + time.sleep(3.0) # standup must finish before balance_stand + self.connection.balance_stand() + ok = True + except Exception: + logger.exception("StandReady failed") + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2StandReady").start() + + def _handle_set_mode(self, msg: dict[str, Any]) -> None: + """Speed-mode select. normal/high differ only by browser-side scale; + only the rage on/off boundary toggles the firmware (set_rage_mode).""" + mode = msg.get("mode") + nonce = msg.get("nonce") + if mode not in ("normal", "high", "rage"): + logger.warning("set_mode: unknown mode %r", mode) + self._send_ack(nonce, False) + return + want_rage = mode == "rage" + if want_rage == self._rage_active: + self._send_ack(nonce, True) # already in the right FSM + return + + def runner() -> None: + ok = False + try: + ok = bool(self.connection.set_rage_mode(want_rage)) + if ok: + self._rage_active = want_rage + logger.info("set_mode: rage=%s ok=%s", want_rage, ok) + except Exception: + logger.exception("set_mode rage=%s failed", want_rage) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2SetMode").start() + + def _send_ack(self, nonce: Any, ok: bool) -> None: + try: + self.telemetry_out.publish( + json.dumps({"type": "cmd_ack", "nonce": nonce, "ok": ok}).encode() + ) + except Exception: + logger.debug("cmd_ack publish failed", exc_info=True) + + # ─── Command-plane health (robot → operator) ───────────────────── + + def _on_cmd_raw(self, data: Any) -> None: + """Read the send-stamp off the header for one-way latency stats.""" + if isinstance(data, str): + data = data.encode() + try: + from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped + + lcm = LCMTwistStamped.lcm_decode(data) + ts = lcm.header.stamp.sec + lcm.header.stamp.nsec / 1_000_000_000 + except Exception: + return # foreign / undecodable frame — skip + self._cmd_stats.record(ts, nbytes=len(data)) + + def _start_telemetry(self) -> None: + def runner() -> None: + interval = 1.0 / max(self.config.telemetry_hz, 0.1) + while not self._stop_event.is_set(): + snap = self._cmd_stats.snapshot() + soc = getattr(self, "_latest_soc", None) # cached by GO2Connection + if snap is not None or soc is not None: + payload = json.dumps( + {"type": "robot_telemetry", "cmd": snap, "soc": soc, "robot_ts": time.time()} + ) + try: + self.telemetry_out.publish(payload.encode()) + except Exception: + logger.debug("telemetry publish failed", exc_info=True) + self._stop_event.wait(interval) + + self._telemetry_thread = threading.Thread( + target=runner, daemon=True, name="Go2HostedTelemetry" + ) + self._telemetry_thread.start() + + +__all__ = ["Go2HostedConnection", "Go2HostedConnectionConfig"] diff --git a/dimos/teleop/quest_hosted/state_bridge.py b/dimos/teleop/quest_hosted/state_bridge.py deleted file mode 100644 index 2f4a104091..0000000000 --- a/dimos/teleop/quest_hosted/state_bridge.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Teleop control-plane bridge (transport-world). - -Two jobs, both translating between the operator's JSON control plane and DimOS: - -* **Inbound state plane** (``state_reliable`` JSON): ``video_stats`` → - typed ``Out[VideoStats]`` (recorders pick it up); ``clock_report`` logged. - Clock-sync ``ping`` is answered inline by ``BrokerProvider`` (lower latency - than a module hop), not here. - -* **Command-plane health** (robot → operator): a raw-bytes tap on - ``cmd_unreliable`` reads the wire ``Header`` (``stamp`` + ``seq`` + size — - no change to ``TwistStamped``) into a rolling ``LiveStreamStats`` window; - a timer publishes ``robot_telemetry`` JSON on ``state_reliable_back`` so the - operator HUD can show latency/jitter/loss the operator can't measure from - its own send side. - -Blueprint wiring:: - - autoconnect(..., TeleopStateBridge.blueprint()).transports({ - ("state_json", bytes): CloudflareTransport("state_reliable"), - ("cmd_raw", bytes): CloudflareTransport("cmd_unreliable"), - ("telemetry_out", bytes): CloudflareTransport("state_reliable_back"), - }) - -Together with the deprecated ``HostedTeleopModule``'s removal this restores -the full ``state_reliable``/``state_reliable_back`` parity in the transport -world. -""" - -from __future__ import annotations - -import json -import threading -import time -from typing import Any - -from reactivex.disposable import Disposable - -from dimos.core.core import rpc -from dimos.core.module import Module, ModuleConfig -from dimos.core.stream import In, Out -from dimos.teleop.utils.stream_stats import LiveStreamStats -from dimos.teleop.utils.video_stats import VideoStats -from dimos.utils.logging_config import setup_logger - -logger = setup_logger() - - -class TeleopStateBridgeConfig(ModuleConfig): - telemetry_hz: float = 3.0 # robot → operator HUD command-plane stats - - -class TeleopStateBridge(Module): - """Operator JSON control plane ↔ typed DimOS streams + command-plane health.""" - - config: TeleopStateBridgeConfig - - # Inbound state plane (operator → robot), raw JSON bytes. - state_json: In[bytes] - # Raw command bytes (operator → robot) — stats tap only; the typed cmd_vel - # decode is a separate transport on the same channel. - cmd_raw: In[bytes] - # Republished operator video health (recorders subscribe). - video_stats: Out[VideoStats] - # robot → operator telemetry JSON (state_reliable_back). - telemetry_out: Out[bytes] - - def __init__(self, **kwargs: Any) -> None: - super().__init__(**kwargs) - self._cmd_stats = LiveStreamStats() - self._telemetry_thread: threading.Thread | None = None - self._stop_event = threading.Event() - - @rpc - def start(self) -> None: - super().start() - self._stop_event.clear() - # Manual sync subscribes (not async handle_*): the keep-latest mailbox - # would drop messages when several land close together, and parsing / - # header-peeking is cheap enough for the transport callback. - for stream, cb in ((self.state_json, self._on_state_json), (self.cmd_raw, self._on_cmd_raw)): - unsub = stream.subscribe(cb) - self.register_disposable(Disposable(unsub)) - self._start_telemetry() - - @rpc - def stop(self) -> None: - self._stop_event.set() - if self._telemetry_thread is not None: - self._telemetry_thread.join(timeout=2.0) - self._telemetry_thread = None - super().stop() - - # ─── Inbound state plane ───────────────────────────────────────── - - def _on_state_json(self, data: Any) -> None: - if isinstance(data, str): - data = data.encode() - if not data.startswith(b"{"): - return # not JSON (future LCM telemetry on this channel) - try: - msg = json.loads(data) - except ValueError: - logger.warning("state_reliable: malformed JSON: %r", data[:80]) - return - - kind = msg.get("type") - if kind == "video_stats": - self.video_stats.publish(VideoStats.from_dict(msg)) - elif kind == "clock_report": - rtt = msg.get("rtt_ms") - off = msg.get("offset_ms") - logger.info( - "clock-sync: operator rtt=%.1fms offset=%.1fms", - float(rtt) if rtt is not None else float("nan"), - float(off) if off is not None else float("nan"), - ) - # ping is answered by BrokerProvider; anything else is a future - # control-plane message this version doesn't know — ignore. - - # ─── Command-plane health ──────────────────────────────────────── - - def _on_cmd_raw(self, data: Any) -> None: - """Peek the wire Header (stamp + seq) for command-plane stats. - - Reads what the operator already puts on the wire — no TwistStamped - change. ``ts`` is operator-clock-corrected (so recv minus ts = one-way - latency); ``seq`` drives loss/reorder. - """ - if isinstance(data, str): - data = data.encode() - try: - from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped - - lcm = LCMTwistStamped.lcm_decode(data) - ts = lcm.header.stamp.sec + lcm.header.stamp.nsec / 1_000_000_000 - seq = lcm.header.seq - except Exception: - return # foreign / undecodable frame on the channel — skip - self._cmd_stats.record(ts, seq=seq, nbytes=len(data)) - - def _start_telemetry(self) -> None: - def runner() -> None: - interval = 1.0 / max(self.config.telemetry_hz, 0.1) - while not self._stop_event.is_set(): - snap = self._cmd_stats.snapshot() - if snap is not None: - payload = json.dumps( - {"type": "robot_telemetry", "cmd": snap, "robot_ts": time.time()} - ) - try: - self.telemetry_out.publish(payload.encode()) - except Exception: - logger.debug("telemetry publish failed", exc_info=True) - self._stop_event.wait(interval) - - self._telemetry_thread = threading.Thread( - target=runner, daemon=True, name="TeleopStateBridgeTelemetry" - ) - self._telemetry_thread.start() - - -__all__ = ["TeleopStateBridge", "TeleopStateBridgeConfig"] diff --git a/dimos/teleop/quest_hosted/test_state_bridge.py b/dimos/teleop/quest_hosted/test_state_bridge.py deleted file mode 100644 index 0c470a7e13..0000000000 --- a/dimos/teleop/quest_hosted/test_state_bridge.py +++ /dev/null @@ -1,155 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for TeleopStateBridge's JSON → typed-stream dispatch.""" - -from __future__ import annotations - -import json -import time -from typing import cast -from unittest.mock import MagicMock - -import pytest - -from dimos.teleop.quest_hosted.state_bridge import TeleopStateBridge -from dimos.teleop.utils.stream_stats import LiveStreamStats -from dimos.teleop.utils.video_stats import VideoStats - - -@pytest.fixture -def bridge() -> TeleopStateBridge: - """Bare bridge with mocked Out streams + a real stats accumulator.""" - b = TeleopStateBridge.__new__(TeleopStateBridge) - b.video_stats = MagicMock() # type: ignore[assignment] - b.telemetry_out = MagicMock() # type: ignore[assignment] - b._cmd_stats = LiveStreamStats() - return b - - -def _publish_mock(bridge: TeleopStateBridge) -> MagicMock: - return cast("MagicMock", bridge.video_stats).publish # type: ignore[no-any-return] - - -def _lcm_twist_bytes(ts: float, seq: int) -> bytes: - """Encode a TwistStamped on the wire the way the operator does — with seq - in the Header (which the dimos TwistStamped doesn't surface).""" - from dimos_lcm.geometry_msgs import TwistStamped as LCMTwistStamped - - m = LCMTwistStamped() - m.header.stamp.sec = int(ts) - m.header.stamp.nsec = int((ts - int(ts)) * 1_000_000_000) - m.header.frame_id = "keyboard" - m.header.seq = seq - return m.lcm_encode() # type: ignore[no-any-return] - - -def test_video_stats_republished_typed(bridge: TeleopStateBridge) -> None: - payload = { - "type": "video_stats", - "ts": 123.0, - "fps": 29.5, - "kbps": 2100.0, - "width": 1280, - "height": 720, - } - bridge._on_state_json(json.dumps(payload).encode()) - - _publish_mock(bridge).assert_called_once() - stats = _publish_mock(bridge).call_args[0][0] - assert isinstance(stats, VideoStats) - assert stats.fps == 29.5 - assert stats.width == 1280 - - -def test_str_payload_accepted(bridge: TeleopStateBridge) -> None: - """aiortc may deliver str on a DataChannel; both must work.""" - bridge._on_state_json('{"type":"video_stats","fps":15.0}') - assert _publish_mock(bridge).call_args[0][0].fps == 15.0 - - -def test_clock_report_logged_not_published(bridge: TeleopStateBridge) -> None: - bridge._on_state_json(b'{"type":"clock_report","rtt_ms":42.0,"offset_ms":-3.0}') - _publish_mock(bridge).assert_not_called() - - -def test_ping_ignored(bridge: TeleopStateBridge) -> None: - """Pings are the provider's job — the bridge must not react.""" - bridge._on_state_json(b'{"type":"ping","client_ts":1.0}') - _publish_mock(bridge).assert_not_called() - - -def test_unknown_type_ignored(bridge: TeleopStateBridge) -> None: - bridge._on_state_json(b'{"type":"mode_switch","mode":"arm"}') - _publish_mock(bridge).assert_not_called() - - -def test_non_json_binary_ignored(bridge: TeleopStateBridge) -> None: - bridge._on_state_json(b"\x00\x01lcm-ish") - _publish_mock(bridge).assert_not_called() - - -def test_malformed_json_dropped(bridge: TeleopStateBridge) -> None: - bridge._on_state_json(b"{not json") - _publish_mock(bridge).assert_not_called() - - -# ─── command-plane stats (cmd_raw tap) ────────────────────────────── - - -def test_cmd_raw_reads_wire_header(bridge: TeleopStateBridge) -> None: - """Header seq+stamp are read off the wire — no TwistStamped change.""" - now = time.time() - for i in range(5): - bridge._on_cmd_raw(_lcm_twist_bytes(now + i * 0.05, seq=i)) - snap = bridge._cmd_stats.snapshot() - assert snap is not None - assert snap["loss_pct"] == 0.0 # contiguous seqs - assert snap["rate_hz"] is not None - assert snap["throughput_bps"] is not None - - -def test_cmd_raw_loss_from_seq_gap(bridge: TeleopStateBridge) -> None: - now = time.time() - for seq in (0, 1, 3, 4, 5): # seq 2 dropped → 1/6 missing in [0,5] - bridge._on_cmd_raw(_lcm_twist_bytes(now + seq * 0.05, seq=seq)) - snap = bridge._cmd_stats.snapshot() - assert snap is not None - assert snap["loss_pct"] == pytest.approx(100.0 / 6.0, abs=0.1) - - -def test_cmd_raw_str_payload_accepted(bridge: TeleopStateBridge) -> None: - bridge._on_cmd_raw(_lcm_twist_bytes(time.time(), seq=0)) - bridge._on_cmd_raw(_lcm_twist_bytes(time.time(), seq=1)) - assert bridge._cmd_stats.snapshot() is not None - - -def test_cmd_raw_garbage_ignored(bridge: TeleopStateBridge) -> None: - """Undecodable frame must not raise or pollute the window.""" - bridge._on_cmd_raw(b"\xff\xfe not lcm") - assert bridge._cmd_stats.snapshot() is None # nothing recorded - - -def test_telemetry_payload_shape(bridge: TeleopStateBridge) -> None: - """robot_telemetry JSON matches what the web HUD parses.""" - now = time.time() - for i in range(4): - bridge._on_cmd_raw(_lcm_twist_bytes(now + i * 0.05, seq=i)) - snap = bridge._cmd_stats.snapshot() - assert snap is not None - payload = json.dumps({"type": "robot_telemetry", "cmd": snap}) - parsed = json.loads(payload) - assert parsed["type"] == "robot_telemetry" - assert "latency_ms" in parsed["cmd"] - assert "loss_pct" in parsed["cmd"] diff --git a/dimos/teleop/utils/stream_stats.py b/dimos/teleop/utils/stream_stats.py index 99e561b0af..5a878a31f7 100644 --- a/dimos/teleop/utils/stream_stats.py +++ b/dimos/teleop/utils/stream_stats.py @@ -14,17 +14,10 @@ """Stat helpers for teleop streams (latency / jitter / rate). -Two flavors live here: - -* **`pcts`** — a pure percentile helper shared by the post-hoc report writer - (``teleop/utils/report.py``) and any live stats consumer. -* **`LiveStreamStats`** — a rolling-window class for always-on consumers that - only need a recent snapshot (e.g. the operator HUD's command-plane telemetry). - -Packet loss / reorder are transport-layer concerns and are intentionally not -computed here from an application sequence number. TODO: surface command-plane -loss from datachannel/SCTP stats (same source as VideoStats.loss_pct), not a -per-message seq. +* **`pcts`** — percentile helper shared with the post-hoc report writer. +* **`LiveStreamStats`** — rolling window the robot measures over the inbound + command wire, then ships each ``snapshot()`` to the operator HUD (the robot + doesn't consume the stats locally — it's compute-and-forward). """ from __future__ import annotations @@ -51,6 +44,8 @@ def pcts(values: Sequence[float]) -> dict[str, float] | None: } +# Loss / reorder helpers — kept for when command loss gets wired (needs a +# send-count). Not used by snapshot() currently. def loss_pct(seqs: Sequence[int]) -> float | None: """Loss % from gaps in a monotonic sequence; None if fewer than 2 seqs. @@ -82,17 +77,13 @@ def reorder_count(seqs: Sequence[int]) -> int: class LiveStreamStats: - """Rolling-window health for an always-on stream consumer. + """Rolling-window health of an inbound stream, for forwarding to a remote HUD. - Records ``(wall, ts, seq, nbytes)`` per arrival in a bounded deque so old - samples fall off automatically; ``snapshot()`` returns the window's median - E2E latency, median inter-arrival jitter, seq-gap loss, reorder count, - arrival rate, and throughput. Thread-safe — ``record()`` runs on the - transport callback, ``snapshot()`` on a separate reader. - - ``seq`` enables loss/reorder (the sender's monotonic counter, read off the - wire); ``nbytes`` enables throughput. Both optional — unstamped streams - still get rate + jitter. + ``record()`` notes each arrival in a bounded deque; ``snapshot()`` returns + the window's median E2E latency, jitter, arrival rate, and throughput — + which the robot ships to the operator (it doesn't use them locally). + Thread-safe: ``record()`` on the transport callback, ``snapshot()`` on a + separate reader. """ def __init__(self, window: int = 120) -> None: @@ -110,12 +101,7 @@ def record( self._samples.append((time.time(), ts, seq, nbytes)) def snapshot(self) -> dict[str, float | None] | None: - """Median latency/jitter (ms), loss (%), reorder, rate (Hz), throughput. - - Returns ``None`` until at least two samples have landed (one inter-arrival - interval is needed). Uses the module's shared ``pcts``/``loss_pct`` so the - math matches the report writer. - """ + """Median latency/jitter (ms), rate (Hz), throughput. None until 2 samples.""" with self._lock: samples = list(self._samples) if len(samples) < 2: @@ -123,9 +109,8 @@ def snapshot(self) -> dict[str, float | None] | None: arrivals = [w for w, _, _, _ in samples] intervals_ms = [(b - a) * 1000.0 for a, b in pairwise(arrivals)] - # `is not None` — ts=0.0 / seq=0 are real values, only None means absent. + # `is not None` — ts=0.0 is a real value, only None means absent. e2e_ms = [(w - ts) * 1000.0 for w, ts, _, _ in samples if ts is not None] - seqs = [s for _, _, s, _ in samples if s is not None] sizes = [n for _, _, _, n in samples if n is not None] e2e = pcts(e2e_ms) @@ -134,8 +119,6 @@ def snapshot(self) -> dict[str, float | None] | None: return { "latency_ms": e2e["p50"] if e2e else None, "jitter_ms": jit["p50"] if jit else None, - "loss_pct": loss_pct(seqs), - "reorder": float(reorder_count(seqs)) if seqs else None, "rate_hz": (len(samples) - 1) / span if span > 0 else None, "throughput_bps": (sum(sizes) / span) if (sizes and span > 0) else None, } From 783f980dedb4d59b8748ffbfffcae6b0e67dfe18 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Mon, 22 Jun 2026 13:06:30 -0700 Subject: [PATCH 10/17] fix: precommit fixes --- .../pubsub/impl/webrtc/providers/broker.py | 4 +- dimos/robot/unitree/go2/connection.py | 4 +- .../quest_hosted/go2_hosted_connection.py | 41 +++++++++++++------ dimos/teleop/utils/stream_stats.py | 4 +- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py index ba86e41003..fb9ca91a02 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/broker.py @@ -351,9 +351,7 @@ def _maybe_answer_ping(self, payload: bytes) -> None: return if msg.get("type") != "ping" or msg.get("client_ts") is None: return - pong = json.dumps( - {"type": "pong", "client_ts": msg["client_ts"], "robot_ts": time.time()} - ) + pong = json.dumps({"type": "pong", "client_ts": msg["client_ts"], "robot_ts": time.time()}) with self._lock: ch = self._dcs.get("state_reliable_back") # Pong MUST go on state_reliable_back — CF bridges one direction only; diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 009ff26440..134c774e7d 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -249,9 +249,7 @@ def onimage(image: Image) -> None: self.register_disposable(self.connection.odom_stream().subscribe(self._publish_tf)) self.register_disposable(self.connection.video_stream().subscribe(onimage)) self.register_disposable(Disposable(self.cmd_vel.subscribe(self.move))) - self.register_disposable( - self.connection.lowstate_stream().subscribe(self._on_lowstate) - ) + self.register_disposable(self.connection.lowstate_stream().subscribe(self._on_lowstate)) self._camera_info_thread = Thread( target=self.publish_camera_info, diff --git a/dimos/teleop/quest_hosted/go2_hosted_connection.py b/dimos/teleop/quest_hosted/go2_hosted_connection.py index e8b07a1bcf..99363f3878 100644 --- a/dimos/teleop/quest_hosted/go2_hosted_connection.py +++ b/dimos/teleop/quest_hosted/go2_hosted_connection.py @@ -50,7 +50,7 @@ "Stretch": 1017, "Damp": 1001, "FrontPounce": 1032, # acrobatic — leaps - "FrontJump": 1031, # acrobatic — leaps + "FrontJump": 1031, # acrobatic — leaps } @@ -63,12 +63,12 @@ class Go2HostedConnection(GO2Connection): config: Go2HostedConnectionConfig - state_json: In[bytes] # operator → robot control JSON (state_reliable) - cmd_raw: In[bytes] # operator → robot command bytes (stats tap) + state_json: In[bytes] # operator → robot control JSON (state_reliable) + cmd_raw: In[bytes] # operator → robot command bytes (stats tap) video_stats: Out[VideoStats] # operator video health, for recorders - telemetry_out: Out[bytes] # robot → operator telemetry + acks (state_reliable_back) - cam2_in: In[Image] # extra camera (RealSense) for the mux - mux_image: Out[Image] # composited cam1(Go2)+cam2 → video transport + telemetry_out: Out[bytes] # robot → operator telemetry + acks (state_reliable_back) + cam2_in: In[Image] # extra camera (RealSense) for the mux + mux_image: Out[Image] # composited cam1(Go2)+cam2 → video transport def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) @@ -78,18 +78,25 @@ def __init__(self, **kwargs: Any) -> None: self._rage_active = False # tracks firmware Rage Mode (speed bar) self._cam_lock = threading.Lock() self._cam_frames: dict[str, Image] = {} # "cam1"/"cam2" → latest frame - self._cam_selected = ["cam1"] # operator tab selection + self._cam_selected = ["cam1"] # operator tab selection @rpc def start(self) -> None: super().start() self._stop_event.clear() # Sync subscribes (not async handle_*): keep-latest would drop bursts. - for stream, cb in ((self.state_json, self._on_state_json), (self.cmd_raw, self._on_cmd_raw)): + for stream, cb in ( + (self.state_json, self._on_state_json), + (self.cmd_raw, self._on_cmd_raw), + ): self.register_disposable(Disposable(stream.subscribe(cb))) # Mux: tap the base's color_image as cam1, RealSense as cam2 → mux_image. - self.register_disposable(Disposable(self.color_image.subscribe(lambda i: self._on_cam("cam1", i)))) - self.register_disposable(Disposable(self.cam2_in.subscribe(lambda i: self._on_cam("cam2", i)))) + self.register_disposable( + Disposable(self.color_image.subscribe(lambda i: self._on_cam("cam1", i))) + ) + self.register_disposable( + Disposable(self.cam2_in.subscribe(lambda i: self._on_cam("cam2", i))) + ) self._start_telemetry() # ─── Camera mux ────────────────────────────────────────────────── @@ -116,7 +123,9 @@ def _composite(self) -> Image | None: tiles = [] for im in imgs: h, w = im.data.shape[:2] - tiles.append(cv2.resize(im.data, (int(w * target_h / h), target_h)) if h != target_h else im.data) + tiles.append( + cv2.resize(im.data, (int(w * target_h / h), target_h)) if h != target_h else im.data + ) return Image(data=np.hstack(tiles), format=imgs[0].format, frame_id="camera_mux") def _set_cam_selection(self, cams: list[str]) -> None: @@ -161,7 +170,8 @@ def _on_state_json(self, data: Any) -> None: elif kind == "clock_report": logger.info( "clock-sync: operator rtt=%s offset=%s", - msg.get("rtt_ms"), msg.get("offset_ms"), + msg.get("rtt_ms"), + msg.get("offset_ms"), ) # ping answered by BrokerProvider; unknown types ignored. @@ -267,7 +277,12 @@ def runner() -> None: soc = getattr(self, "_latest_soc", None) # cached by GO2Connection if snap is not None or soc is not None: payload = json.dumps( - {"type": "robot_telemetry", "cmd": snap, "soc": soc, "robot_ts": time.time()} + { + "type": "robot_telemetry", + "cmd": snap, + "soc": soc, + "robot_ts": time.time(), + } ) try: self.telemetry_out.publish(payload.encode()) diff --git a/dimos/teleop/utils/stream_stats.py b/dimos/teleop/utils/stream_stats.py index 5a878a31f7..e1688dd825 100644 --- a/dimos/teleop/utils/stream_stats.py +++ b/dimos/teleop/utils/stream_stats.py @@ -93,9 +93,7 @@ def __init__(self, window: int = 120) -> None: maxlen=window ) - def record( - self, ts: float | None, seq: int | None = None, nbytes: int | None = None - ) -> None: + def record(self, ts: float | None, seq: int | None = None, nbytes: int | None = None) -> None: """Note an inbound message's send-stamp, seq, and wire size (any None).""" with self._lock: self._samples.append((time.time(), ts, seq, nbytes)) From 24bca019c5d22ebaa3786b90ec40be972e28e8b1 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Mon, 22 Jun 2026 13:16:52 -0700 Subject: [PATCH 11/17] fix: realsense cam tf fix --- .../sensors/camera/realsense/camera.py | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/dimos/hardware/sensors/camera/realsense/camera.py b/dimos/hardware/sensors/camera/realsense/camera.py index 6257720608..70d7b6ea96 100644 --- a/dimos/hardware/sensors/camera/realsense/camera.py +++ b/dimos/hardware/sensors/camera/realsense/camera.py @@ -358,17 +358,29 @@ def _publish_tf(self, ts: float) -> None: ) transforms.append(depth_to_depth_optical) - color_tf = self._extrinsics_to_transform( - self._color_to_depth_extrinsics, - self._camera_link, - self._color_frame, - ts, - ) - # Invert the transform since extrinsics are color->depth - color_tf = color_tf.inverse() - color_tf.frame_id = self._camera_link - color_tf.child_frame_id = self._color_frame - color_tf.ts = ts + # camera_link -> camera_color_frame. With depth disabled there are no + # color->depth extrinsics, so fall back to identity (color at the + # camera_link origin) instead of dereferencing None. + if self._color_to_depth_extrinsics is not None: + color_tf = self._extrinsics_to_transform( + self._color_to_depth_extrinsics, + self._camera_link, + self._color_frame, + ts, + ) + # Invert the transform since extrinsics are color->depth + color_tf = color_tf.inverse() + color_tf.frame_id = self._camera_link + color_tf.child_frame_id = self._color_frame + color_tf.ts = ts + else: + color_tf = Transform( + translation=Vector3(0.0, 0.0, 0.0), + rotation=Quaternion(0.0, 0.0, 0.0, 1.0), + frame_id=self._camera_link, + child_frame_id=self._color_frame, + ts=ts, + ) transforms.append(color_tf) # camera_color_frame -> camera_color_optical_frame From 08c285ac69ef2476f0b5c50ee049501d8c6f397d Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Mon, 22 Jun 2026 13:33:54 -0700 Subject: [PATCH 12/17] feat: hosted connection module --- .../unitree/go2/hosted_connection.py} | 0 dimos/teleop/quest_hosted/blueprints.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename dimos/{teleop/quest_hosted/go2_hosted_connection.py => robot/unitree/go2/hosted_connection.py} (100%) diff --git a/dimos/teleop/quest_hosted/go2_hosted_connection.py b/dimos/robot/unitree/go2/hosted_connection.py similarity index 100% rename from dimos/teleop/quest_hosted/go2_hosted_connection.py rename to dimos/robot/unitree/go2/hosted_connection.py diff --git a/dimos/teleop/quest_hosted/blueprints.py b/dimos/teleop/quest_hosted/blueprints.py index 7d15e539fc..78198a77e5 100644 --- a/dimos/teleop/quest_hosted/blueprints.py +++ b/dimos/teleop/quest_hosted/blueprints.py @@ -30,8 +30,8 @@ from dimos.msgs.sensor_msgs.Image import Image from dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic import unitree_go2_basic from dimos.robot.unitree.go2.connection import GO2Connection +from dimos.robot.unitree.go2.hosted_connection import Go2HostedConnection from dimos.teleop.quest.quest_types import Buttons -from dimos.teleop.quest_hosted.go2_hosted_connection import Go2HostedConnection from dimos.teleop.quest_hosted.hosted_extensions import ( HostedArmTeleopModule, HostedTwistTeleopModule, From 6840891151dfcd82196431676bb66c6d5118621a Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Tue, 23 Jun 2026 12:59:34 -0700 Subject: [PATCH 13/17] feat: rage mode toggle ON/OFF --- .../impl/webrtc/providers/livekit_broker.py | 2 -- dimos/robot/unitree/connection.py | 6 ------ dimos/robot/unitree/dimsim_connection.py | 9 ++++++++- dimos/robot/unitree/go2/connection.py | 17 ++++++++--------- dimos/robot/unitree/mujoco_connection.py | 12 ++++++++++-- 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index 5abfe8ce13..386a6c1a08 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -44,7 +44,6 @@ from collections import defaultdict from collections.abc import Callable import contextlib -from dataclasses import dataclass import importlib.util import os from typing import TYPE_CHECKING, Any @@ -71,7 +70,6 @@ from dimos.msgs.sensor_msgs.Image import Image -@dataclass(frozen=True) class LiveKitBrokerConfig(ProviderConfig): """Hosted teleop over LiveKit. Credentials default from TELEOP_* env.""" diff --git a/dimos/robot/unitree/connection.py b/dimos/robot/unitree/connection.py index cbc83ca3d7..6a9fa6c599 100644 --- a/dimos/robot/unitree/connection.py +++ b/dimos/robot/unitree/connection.py @@ -328,12 +328,6 @@ def free_walk(self) -> bool: """Activate FreeWalk locomotion mode — enables walking and velocity commands.""" return bool(self.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": SPORT_CMD["FreeWalk"]})) - def enable_rage_mode(self) -> bool: - """Enable Rage Mode on the Go2 via WebRTC. - Assumes the robot is already in BalanceStand. - """ - return self.set_rage_mode(True) - def set_rage_mode(self, enable: bool) -> bool: """Toggle Rage Mode (api 2059) over WebRTC, both directions. diff --git a/dimos/robot/unitree/dimsim_connection.py b/dimos/robot/unitree/dimsim_connection.py index 5afcd1fda7..1dd2ffd764 100644 --- a/dimos/robot/unitree/dimsim_connection.py +++ b/dimos/robot/unitree/dimsim_connection.py @@ -79,6 +79,10 @@ def odom_stream(self) -> Observable[PoseStamped]: def video_stream(self) -> Observable[Image]: return Subject() + @functools.cache + def lowstate_stream(self) -> Observable[Any]: + return Subject() + def move(self, twist: Twist, duration: float = 0.0) -> bool: return True @@ -91,10 +95,13 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 134c774e7d..cc66315c1c 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -77,13 +77,14 @@ def stop(self) -> None: ... def lidar_stream(self) -> Observable: ... # type: ignore[type-arg] def odom_stream(self) -> Observable: ... # type: ignore[type-arg] def video_stream(self) -> Observable: ... # type: ignore[type-arg] + def lowstate_stream(self) -> Observable: ... # type: ignore[type-arg] def move(self, twist: Twist, duration: float = 0.0) -> bool: ... def standup(self) -> bool: ... def liedown(self) -> bool: ... def balance_stand(self) -> bool: ... def sport_command(self, api_id: int) -> bool: ... def set_obstacle_avoidance(self, enabled: bool = True) -> None: ... - def enable_rage_mode(self) -> bool: ... + def set_rage_mode(self, enable: bool) -> bool: ... def publish_request(self, topic: str, data: dict) -> dict: ... # type: ignore[type-arg] @@ -175,7 +176,7 @@ def sport_command(self, api_id: int) -> bool: def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True @simple_mcache @@ -262,7 +263,7 @@ def onimage(image: Image) -> None: self.connection.balance_stand() if self.config.mode == Go2Mode.RAGE: - self.connection.enable_rage_mode() + self.connection.set_rage_mode(True) self.connection.set_obstacle_avoidance(self.config.g.obstacle_avoidance) @@ -350,14 +351,12 @@ def get_battery_soc(self) -> int | None: return getattr(self, "_latest_soc", None) @rpc - def enable_rage_mode(self) -> bool: - """Enable Rage Mode (~2.5 m/s forward velocity envelope). + def set_rage_mode(self, enable: bool) -> bool: + """Toggle Rage Mode on/off (~2.5 m/s envelope when on). Ensures BalanceStand precondition regardless of current FSM state. """ - self.connection.balance_stand() - time.sleep(0.3) - result = self.connection.enable_rage_mode() - logger.info("Rage Mode enabled") + result = self.connection.set_rage_mode(enable) + logger.info("Rage Mode %s", "enabled" if enable else "disabled") return result @rpc diff --git a/dimos/robot/unitree/mujoco_connection.py b/dimos/robot/unitree/mujoco_connection.py index 4c455899e8..ebb9a282c8 100644 --- a/dimos/robot/unitree/mujoco_connection.py +++ b/dimos/robot/unitree/mujoco_connection.py @@ -33,7 +33,7 @@ import numpy as np from numpy.typing import NDArray -from reactivex import Observable +from reactivex import Observable, empty from reactivex.abc import ObserverBase, SchedulerBase from reactivex.disposable import Disposable @@ -234,10 +234,13 @@ def liedown(self) -> bool: def balance_stand(self) -> bool: return True + def sport_command(self, api_id: int) -> bool: + return True + def set_obstacle_avoidance(self, enabled: bool = True) -> None: pass - def enable_rage_mode(self) -> bool: + def set_rage_mode(self, enable: bool) -> bool: return True def get_video_frame(self) -> NDArray[Any] | None: @@ -337,6 +340,11 @@ def get_video_as_image() -> Image | None: return self._create_stream(get_video_as_image, VIDEO_FPS, "Video") + @functools.cache + def lowstate_stream(self) -> Observable[Any]: + # Sim has no low-level state (battery/IMU) stream — emit nothing. + return empty() + def move(self, twist: Twist, duration: float = 0.0) -> bool: if self._is_cleaned_up or self.shm_data is None: return True From e68fe024083633d20ae64e57a3010382a891fc10 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Tue, 23 Jun 2026 13:45:47 -0700 Subject: [PATCH 14/17] feat: battery and lowlevel telemetry access --- dimos/robot/unitree/connection.py | 18 +++++---- dimos/robot/unitree/go2/connection.py | 41 +++++++++++++------- dimos/robot/unitree/go2/hosted_connection.py | 2 +- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/dimos/robot/unitree/connection.py b/dimos/robot/unitree/connection.py index 6a9fa6c599..4975c6d60c 100644 --- a/dimos/robot/unitree/connection.py +++ b/dimos/robot/unitree/connection.py @@ -49,8 +49,11 @@ from dimos.robot.unitree.type.odometry import Odometry from dimos.types.timestamped import Timestamped from dimos.utils.decorators.decorators import simple_mcache +from dimos.utils.logging_config import setup_logger from dimos.utils.reactive import backpressure, callback_to_observable +logger = setup_logger() + VideoMessage: TypeAlias = NDArray[np.uint8] # Shape: (height, width, 3) @@ -331,14 +334,13 @@ def free_walk(self) -> bool: def set_rage_mode(self, enable: bool) -> bool: """Toggle Rage Mode (api 2059) over WebRTC, both directions. - Mirrors the DDS adapter recipe: BalanceStand → 2059 {data:enable} → - on enable, settle + SwitchJoystick(True); on disable, SwitchJoystick(False) - to return to the normal velocity envelope. After enable, normal move() - twists drive at the ~2.5 m/s rage envelope. + BalanceStand → 2059 {data:enable} → SwitchJoystick(enable). When on, + normal move() twists drive at the ~2.5 m/s rage envelope. """ # Re-establish BalanceStand before toggling (notes: always BalanceStand # before flipping Rage). - self.balance_stand() + if not self.balance_stand(): + logger.warning("balance_stand() failed before rage toggle — proceeding") time.sleep(0.3) rage_ok = bool( @@ -347,15 +349,17 @@ def set_rage_mode(self, enable: bool) -> bool: {"api_id": self._SPORT_API_ID_RAGEMODE, "parameter": {"data": enable}}, ) ) + if not rage_ok: + return False + if enable: time.sleep(2.0) # let FsmRageMode transition settle - joystick_ok = bool( + return bool( self.publish_request( RTC_TOPIC["SPORT_MOD"], {"api_id": SPORT_CMD["SwitchJoystick"], "parameter": {"data": enable}}, ) ) - return rage_ok and joystick_ok def liedown(self) -> bool: return bool( diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index cc66315c1c..59dc2af177 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Protocol from pydantic import Field +from reactivex import empty from reactivex.disposable import Disposable from reactivex.observable import Observable import rerun.blueprint as rrb @@ -49,6 +50,7 @@ from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.robot.unitree.connection import UnitreeWebRTCConnection +from dimos.robot.unitree.type.lowstate import LowStateMsg from dimos.utils.decorators.decorators import cached_property, simple_mcache if sys.version_info < (3, 13): @@ -191,6 +193,11 @@ def odom_stream(self) -> Observable[PoseStamped]: def video_stream(self) -> Observable[Image]: return self.replay.streams.color_image.observable() + @simple_mcache + def lowstate_stream(self) -> Observable: # type: ignore[type-arg] + # Replay datasets carry no low-level state (battery/IMU) — emit nothing. + return empty() + def move(self, twist: Twist, duration: float = 0.0) -> bool: return True @@ -217,6 +224,7 @@ class GO2Connection(Module, Camera, Pointcloud): camera_info_static: CameraInfo = _camera_info_static() _camera_info_thread: Thread | None = None _latest_video_frame: Image | None = None + _latest_lowstate: LowStateMsg | None = None @classmethod def rerun_views(cls): # type: ignore[no-untyped-def] @@ -336,29 +344,32 @@ def balance_stand(self) -> bool: """Enter BalanceStand: neutral state for switching locomotion modes""" return self.connection.balance_stand() - def _on_lowstate(self, msg: Any) -> None: - """Cache battery SOC from the lowstate push stream (bms_state.soc, %).""" - try: - self._latest_soc = int(msg["data"]["bms_state"]["soc"]) - except (KeyError, TypeError, ValueError): - if not getattr(self, "_soc_parse_warned", False): - self._soc_parse_warned = True - logger.warning("lowstate: could not read bms_state.soc — battery unavailable") - - @rpc - def get_battery_soc(self) -> int | None: - """Latest battery state-of-charge (0-100%), or None until first lowstate.""" - return getattr(self, "_latest_soc", None) - @rpc def set_rage_mode(self, enable: bool) -> bool: """Toggle Rage Mode on/off (~2.5 m/s envelope when on). - Ensures BalanceStand precondition regardless of current FSM state. + On the WebRTC backend this re-establishes the BalanceStand + precondition before toggling; sim backends are no-ops. """ result = self.connection.set_rage_mode(enable) logger.info("Rage Mode %s", "enabled" if enable else "disabled") return result + def _on_lowstate(self, msg: LowStateMsg) -> None: + """Cache the latest low-level state push (battery, IMU, motors, etc.).""" + self._latest_lowstate = msg + + @skill + def get_battery_soc(self) -> int | None: + """Returns the robot's battery state-of-charge as a percentage (0-100). + + Use this skill to answer battery / power / charge questions. Returns + None if no low-level state has been received yet. + """ + try: + return int(self._latest_lowstate["data"]["bms_state"]["soc"]) # type: ignore[index] + except (KeyError, TypeError, ValueError): + return None + @rpc def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: """Publish a request to the WebRTC connection. diff --git a/dimos/robot/unitree/go2/hosted_connection.py b/dimos/robot/unitree/go2/hosted_connection.py index 99363f3878..26a38e32cd 100644 --- a/dimos/robot/unitree/go2/hosted_connection.py +++ b/dimos/robot/unitree/go2/hosted_connection.py @@ -274,7 +274,7 @@ def runner() -> None: interval = 1.0 / max(self.config.telemetry_hz, 0.1) while not self._stop_event.is_set(): snap = self._cmd_stats.snapshot() - soc = getattr(self, "_latest_soc", None) # cached by GO2Connection + soc = self.get_battery_soc() # parsed from lowstate by GO2Connection if snap is not None or soc is not None: payload = json.dumps( { From 7983d0092a772f2efd6250ab8f44c1c40c2362e9 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Tue, 23 Jun 2026 14:20:56 -0700 Subject: [PATCH 15/17] feat: battery status from hosted connection --- dimos/robot/unitree/go2/hosted_connection.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dimos/robot/unitree/go2/hosted_connection.py b/dimos/robot/unitree/go2/hosted_connection.py index 26a38e32cd..42aa707507 100644 --- a/dimos/robot/unitree/go2/hosted_connection.py +++ b/dimos/robot/unitree/go2/hosted_connection.py @@ -269,12 +269,20 @@ def _on_cmd_raw(self, data: Any) -> None: return # foreign / undecodable frame — skip self._cmd_stats.record(ts, nbytes=len(data)) + def _battery_soc(self) -> int | None: + """Battery SOC from the cached lowstate, without invoking the logged + ``get_battery_soc`` skill (which the 3 Hz telemetry loop would spam).""" + try: + return int(self._latest_lowstate["data"]["bms_state"]["soc"]) # type: ignore[index] + except (KeyError, TypeError, ValueError): + return None + def _start_telemetry(self) -> None: def runner() -> None: interval = 1.0 / max(self.config.telemetry_hz, 0.1) while not self._stop_event.is_set(): snap = self._cmd_stats.snapshot() - soc = self.get_battery_soc() # parsed from lowstate by GO2Connection + soc = self._battery_soc() if snap is not None or soc is not None: payload = json.dumps( { From 9a835a618917739d204a8c7d43394e2f2aebbf0d Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Tue, 23 Jun 2026 17:39:39 -0700 Subject: [PATCH 16/17] refactor(teleop): drop TELEOP_* env vars, use transports.broker.* everywhere --- default.env | 11 ++++--- dimos/core/transport.py | 3 +- .../impl/webrtc/providers/livekit_broker.py | 33 ++++++++++--------- .../quest_hosted/hosted_teleop_module.py | 12 +++---- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/default.env b/default.env index a43ee53305..0f56044b7e 100644 --- a/default.env +++ b/default.env @@ -7,10 +7,13 @@ HUGGINGFACE_PRV_ENDPOINT= ROBOT_IP= CONN_TYPE=webrtc -# Hosted teleop (teleop-hosted-* blueprints) -TELEOP_API_KEY= -TELEOP_ROBOT_ID= -TELEOP_ROBOT_NAME= +# Hosted teleop — transport blueprints (teleop-hosted-go2-transport / -livekit / -multicam). +# Names map to the transports.broker.* config (-o transports.broker.api_key=...). +# (The deprecated HostedTeleopModule blueprints take -o hosted-teleop.broker_api_key=...) +TRANSPORTS__BROKER__BROKER_URL=https://teleop.dimensionalos.com +TRANSPORTS__BROKER__API_KEY= +TRANSPORTS__BROKER__ROBOT_ID= +TRANSPORTS__BROKER__ROBOT_NAME= WEBRTC_SERVER_HOST=0.0.0.0 WEBRTC_SERVER_PORT=9991 DISPLAY=:0 diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 122e0d714e..183c5a878c 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -461,7 +461,8 @@ class LiveKitTransport(WebRTCTransport[M]): """WebRTC DataChannels via the hosted teleop broker + LiveKit SFU. Drop-in alternative to :class:`CloudflareTransport`; config kwargs flow into - :class:`LiveKitBrokerConfig` (unset fields fall back to ``TELEOP_*`` env). + :class:`LiveKitBrokerConfig`, populated from ``transports.broker.*`` + (``TRANSPORTS__BROKER__*`` env / ``-o`` overrides). unitree_go2_livekit = unitree_go2_basic.transports({ ("cmd_vel", Twist): LiveKitTransport("cmd_unreliable", TwistStamped), diff --git a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py index 386a6c1a08..4aefd66bc2 100644 --- a/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py +++ b/dimos/protocol/pubsub/impl/webrtc/providers/livekit_broker.py @@ -31,11 +31,13 @@ (published lazily on the first frame) — typically via ``LiveKitVideoTransport`` bound to a blueprint's Image stream. -Env vars (fallback when config fields are unset): - TELEOP_BROKER_URL — default https://teleop.dimensionalos.com - TELEOP_API_KEY — robot API key (dtk_live_*); broker derives identity - TELEOP_ROBOT_ID — optional robot identifier override - TELEOP_ROBOT_NAME — human-readable robot name +Config comes from the blueprint's ``transports.broker.*`` flow (env form +``TRANSPORTS__BROKER__``, or ``-o transports.broker.=...``), the +same scheme as the Cloudflare ``BrokerConfig``: + TRANSPORTS__BROKER__BROKER_URL — default https://teleop.dimensionalos.com + TRANSPORTS__BROKER__API_KEY — robot API key (dtk_live_*); derives identity + TRANSPORTS__BROKER__ROBOT_ID — optional robot identifier override + TRANSPORTS__BROKER__ROBOT_NAME — human-readable robot name """ from __future__ import annotations @@ -45,7 +47,6 @@ from collections.abc import Callable import contextlib import importlib.util -import os from typing import TYPE_CHECKING, Any from dimos.protocol.pubsub.impl.webrtc.providers.spec import ( @@ -71,7 +72,7 @@ class LiveKitBrokerConfig(ProviderConfig): - """Hosted teleop over LiveKit. Credentials default from TELEOP_* env.""" + """Hosted teleop over LiveKit. Config from transports.broker.* (TRANSPORTS__BROKER__*).""" broker_url: str | None = None api_key: str | None = None @@ -192,17 +193,17 @@ def __init__(self, config: LiveKitBrokerConfig | None = None) -> None: raise RuntimeError("livekit and httpx required: pip install dimos[livekit]") super().__init__() config = config or LiveKitBrokerConfig() - self._broker_url = ( - config.broker_url - or os.environ.get("TELEOP_BROKER_URL", "https://teleop.dimensionalos.com") - ).rstrip("/") - self._api_key = config.api_key or os.environ.get("TELEOP_API_KEY", "") - self._robot_id = config.robot_id or os.environ.get("TELEOP_ROBOT_ID", "") - self._robot_name = config.robot_name or os.environ.get("TELEOP_ROBOT_NAME", "robot") + # Config is populated from transports.broker.* (= TRANSPORTS__BROKER__* + # env / -o overrides), same scheme as the Cloudflare BrokerConfig. + self._broker_url = (config.broker_url or "https://teleop.dimensionalos.com").rstrip("/") + self._api_key = config.api_key or "" + self._robot_id = config.robot_id or "" + self._robot_name = config.robot_name or "robot" if not self._api_key: raise RuntimeError( - "TELEOP_API_KEY or LiveKitBrokerConfig.api_key required " - "(create one in the teleop dashboard: New Key)" + "transports.broker.api_key required " + "(TRANSPORTS__BROKER__API_KEY=dtk_live_...; create one in the " + "teleop dashboard: New Key)" ) self._config = config diff --git a/dimos/teleop/quest_hosted/hosted_teleop_module.py b/dimos/teleop/quest_hosted/hosted_teleop_module.py index 5604dc2c47..df92668751 100644 --- a/dimos/teleop/quest_hosted/hosted_teleop_module.py +++ b/dimos/teleop/quest_hosted/hosted_teleop_module.py @@ -28,7 +28,6 @@ import asyncio from enum import IntEnum import json -import os import threading import time from typing import Any @@ -75,7 +74,7 @@ class HostedTeleopConfig(ModuleConfig): control_loop_hz: float = 50.0 broker_url: str = "https://teleop.dimensionalos.com" - # Empty defaults; resolved from TELEOP_* env vars at start() if unset. + # Set via the module-config flow (-o hosted-teleop.broker_api_key=... or env). broker_api_key: str = "" robot_id: str = "" robot_name: str = "" @@ -241,8 +240,8 @@ def _on_gathering() -> None: url = f"{self.config.broker_url.rstrip('/')}/api/v1/sessions" body = { - "robot_id": self.config.robot_id or os.getenv("TELEOP_ROBOT_ID", ""), - "robot_name": self.config.robot_name or os.getenv("TELEOP_ROBOT_NAME", ""), + "robot_id": self.config.robot_id, + "robot_name": self.config.robot_name, "sdp_offer": self._pc.localDescription.sdp, } resp = await self._http.post(url, json=body, headers=self._auth_headers()) @@ -287,9 +286,8 @@ async def _disconnect(self) -> None: self._session_id = None def _auth_headers(self) -> dict[str, str]: - api_key = self.config.broker_api_key or os.getenv("TELEOP_API_KEY") - if api_key: - return {"X-Robot-API-Key": api_key} + if self.config.broker_api_key: + return {"X-Robot-API-Key": self.config.broker_api_key} return {} def _start_heartbeat(self) -> None: From 79fce07a84631a7fd0f5ea28ebc9c7fa21565962 Mon Sep 17 00:00:00 2001 From: Ruthwik Date: Tue, 23 Jun 2026 19:00:47 -0700 Subject: [PATCH 17/17] feat: obstacle avoidance toggle --- dimos/robot/unitree/go2/hosted_connection.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/dimos/robot/unitree/go2/hosted_connection.py b/dimos/robot/unitree/go2/hosted_connection.py index 42aa707507..0e8080d719 100644 --- a/dimos/robot/unitree/go2/hosted_connection.py +++ b/dimos/robot/unitree/go2/hosted_connection.py @@ -165,6 +165,8 @@ def _on_state_json(self, data: Any) -> None: self._handle_set_mode(msg) elif kind == "camera_select": self._set_cam_selection(msg.get("cams", [])) + elif kind == "obstacle_avoidance": + self._handle_obstacle_avoidance(msg) elif kind == "video_stats": self.video_stats.publish(VideoStats.from_dict(msg)) elif kind == "clock_report": @@ -246,6 +248,23 @@ def runner() -> None: threading.Thread(target=runner, daemon=True, name="Go2SetMode").start() + def _handle_obstacle_avoidance(self, msg: dict[str, Any]) -> None: + """Toggle the Go2's onboard obstacle avoidance on/off.""" + enabled = bool(msg.get("enabled")) + nonce = msg.get("nonce") + + def runner() -> None: + ok = False + try: + self.connection.set_obstacle_avoidance(enabled) + ok = True + logger.info("obstacle_avoidance: enabled=%s", enabled) + except Exception: + logger.exception("obstacle_avoidance enabled=%s failed", enabled) + self._send_ack(nonce, ok) + + threading.Thread(target=runner, daemon=True, name="Go2ObstacleAvoid").start() + def _send_ack(self, nonce: Any, ok: bool) -> None: try: self.telemetry_out.publish(