Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions default.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ HUGGINGFACE_PRV_ENDPOINT=
ROBOT_IP=
CONN_TYPE=webrtc

# Hosted teleop (teleop-hosted-* blueprints)
TELEOP_API_KEY=
TELEOP_ROBOT_ID=
TELEOP_ROBOT_NAME=
# Hosted teleop — transport blueprints (teleop-hosted-go2-transport / -livekit / -multicam).
# Names map to the transports.broker.* config (-o transports.broker.api_key=...).
# (The deprecated HostedTeleopModule blueprints take -o hosted-teleop.broker_api_key=...)
TRANSPORTS__BROKER__BROKER_URL=https://teleop.dimensionalos.com
TRANSPORTS__BROKER__API_KEY=
TRANSPORTS__BROKER__ROBOT_ID=
TRANSPORTS__BROKER__ROBOT_NAME=
WEBRTC_SERVER_HOST=0.0.0.0
WEBRTC_SERVER_PORT=9991
DISPLAY=:0
Expand Down
23 changes: 23 additions & 0 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory
from dimos.protocol.pubsub.impl.webrtc.providers.broker import BrokerConfig
from dimos.protocol.pubsub.impl.webrtc.providers.livekit_broker import LiveKitBrokerConfig
from dimos.protocol.pubsub.impl.webrtc.providers.spec import ProviderConfig
from dimos.protocol.pubsub.impl.webrtc.webrtcpubsub import WebRTCPubSub
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -456,6 +457,22 @@ class CloudflareTransport(WebRTCTransport[M]):
_config_cls = BrokerConfig


class LiveKitTransport(WebRTCTransport[M]):
"""WebRTC DataChannels via the hosted teleop broker + LiveKit SFU.

Drop-in alternative to :class:`CloudflareTransport`; config kwargs flow into
:class:`LiveKitBrokerConfig`, populated from ``transports.broker.*``
(``TRANSPORTS__BROKER__*`` env / ``-o`` overrides).

unitree_go2_livekit = unitree_go2_basic.transports({
("cmd_vel", Twist): LiveKitTransport("cmd_unreliable", TwistStamped),
("color_image", Image): LiveKitVideoTransport(),
})
"""

_config_cls = LiveKitBrokerConfig


class WebRTCVideoTransport(Transport[Any]):
"""Robot camera → remote viewer as a WebRTC video track (provider-agnostic).

Expand Down Expand Up @@ -515,4 +532,10 @@ class CloudflareVideoTransport(WebRTCVideoTransport):
_config_cls = BrokerConfig


class LiveKitVideoTransport(WebRTCVideoTransport):
"""Camera → teleop web client via the hosted broker + LiveKit (see WebRTCVideoTransport)."""

_config_cls = LiveKitBrokerConfig


class ZenohTransport(PubSubTransport[T]): ...
34 changes: 23 additions & 11 deletions dimos/hardware/sensors/camera/realsense/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,29 @@ def _publish_tf(self, ts: float) -> None:
)
transforms.append(depth_to_depth_optical)

color_tf = self._extrinsics_to_transform(
self._color_to_depth_extrinsics,
self._camera_link,
self._color_frame,
ts,
)
# Invert the transform since extrinsics are color->depth
color_tf = color_tf.inverse()
color_tf.frame_id = self._camera_link
color_tf.child_frame_id = self._color_frame
color_tf.ts = ts
# camera_link -> camera_color_frame. With depth disabled there are no
# color->depth extrinsics, so fall back to identity (color at the
# camera_link origin) instead of dereferencing None.
if self._color_to_depth_extrinsics is not None:
color_tf = self._extrinsics_to_transform(
self._color_to_depth_extrinsics,
self._camera_link,
self._color_frame,
ts,
)
# Invert the transform since extrinsics are color->depth
color_tf = color_tf.inverse()
color_tf.frame_id = self._camera_link
color_tf.child_frame_id = self._color_frame
color_tf.ts = ts
else:
color_tf = Transform(
translation=Vector3(0.0, 0.0, 0.0),
rotation=Quaternion(0.0, 0.0, 0.0, 1.0),
frame_id=self._camera_link,
child_frame_id=self._color_frame,
ts=ts,
)
transforms.append(color_tf)

# camera_color_frame -> camera_color_optical_frame
Expand Down
31 changes: 31 additions & 0 deletions dimos/protocol/pubsub/impl/webrtc/providers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from collections import defaultdict
from collections.abc import Callable
import contextlib
import json
import time
from typing import TYPE_CHECKING, Any

from dimos.protocol.pubsub.impl.webrtc.providers.sdp import propagate_bundle_candidates
Expand Down Expand Up @@ -312,6 +314,8 @@ def _open_channel(self, name: str, sctp_id: int) -> None:
def _on_msg(payload: Any) -> None:
if isinstance(payload, str):
payload = payload.encode()
if name == "state_reliable":
self._maybe_answer_ping(payload)
with self._lock:
callbacks = list(self._callbacks.get(name, ()))
for cb in callbacks:
Expand All @@ -330,6 +334,33 @@ def _close_channel(self, name: str) -> None:
with contextlib.suppress(Exception):
ch.close()

def _maybe_answer_ping(self, payload: bytes) -> None:
"""Answer the web client's clock-sync ping inline on the loop thread.

The operator measures RTT/offset from ping→pong timing, so the reply
must not ride a module hop (stream dispatch latency would inflate
every sample, and keep-latest mailboxes could drop pings outright).
The ping still fans out to subscribers afterwards — the provider stays
a transparent relay with this one reflex attached.
"""
if not payload.startswith(b"{"):
return # LCM binary or other non-JSON — not ours
try:
msg = json.loads(payload)
except ValueError:
return
if msg.get("type") != "ping" or msg.get("client_ts") is None:
return
pong = json.dumps({"type": "pong", "client_ts": msg["client_ts"], "robot_ts": time.time()})
with self._lock:
ch = self._dcs.get("state_reliable_back")
# Pong MUST go on state_reliable_back — CF bridges one direction only;
# a robot send on state_reliable would be silently dropped.
if ch is not None and ch.readyState == "open":
ch.send(pong)
else:
logger.warning("ping received but state_reliable_back not open — pong dropped")

# ─── Public API (Provider) ───────────────────────────────────────

def publish(self, topic: str, data: bytes) -> None:
Expand Down
Loading
Loading