From 815df15e0901a116fe050a5266a9771902121f95 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Tue, 16 Jun 2026 20:33:00 +0800 Subject: [PATCH 01/13] feat(audio): add AudioModule for issue #1932 Adds mic audio capture and chunked publishing as AudioStamped on an Out stream, mirroring CameraModule. Validated on macOS Apple Silicon at 50 Hz / 20 ms frames with both synthetic (sine tone) and real mic sources. - dimos/msgs/audio_msgs/AudioStamped.py: Python overlay wrapping foxglove_msgs.RawAudio for LCM encode/decode, with from_pcm() and to_numpy() helpers. Flags that builtin_interfaces.Time (not std_msgs.Header) is the wire type, so frame_id is not preserved. - dimos/hardware/sensors/audio/module.py: AudioModule(Module) with AudioConfig(ModuleConfig), async def main() lifecycle, @rpc start/stop, @skill record_clip. - examples/audio/validate_audio_module.py: LCM round-trip assert + live stream rate/timestamp validation. Co-Authored-By: Claude Sonnet 4.6 --- dimos/hardware/sensors/audio/__init__.py | 13 ++ dimos/hardware/sensors/audio/module.py | 205 +++++++++++++++++++++++ dimos/msgs/audio_msgs/AudioStamped.py | 168 +++++++++++++++++++ dimos/msgs/audio_msgs/__init__.py | 15 ++ examples/audio/validate_audio_module.py | 149 ++++++++++++++++ 5 files changed, 550 insertions(+) create mode 100644 dimos/hardware/sensors/audio/__init__.py create mode 100644 dimos/hardware/sensors/audio/module.py create mode 100644 dimos/msgs/audio_msgs/AudioStamped.py create mode 100644 dimos/msgs/audio_msgs/__init__.py create mode 100644 examples/audio/validate_audio_module.py diff --git a/dimos/hardware/sensors/audio/__init__.py b/dimos/hardware/sensors/audio/__init__.py new file mode 100644 index 0000000000..bc1a2ce5cc --- /dev/null +++ b/dimos/hardware/sensors/audio/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py new file mode 100644 index 0000000000..855b6faf6c --- /dev/null +++ b/dimos/hardware/sensors/audio/module.py @@ -0,0 +1,205 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""AudioModule — captures mic audio and publishes AudioStamped on an Out stream. + +Mirrors CameraModule shape. Supports: + - Real capture via sounddevice / PortAudio (synthetic=False, default) + - Sine-tone fallback with no mic required (synthetic=True) + +macOS setup: + brew install portaudio + pip install sounddevice numpy + Grant microphone permission on first real run. +""" + +from __future__ import annotations + +import asyncio +import time + +import numpy as np +from pydantic import Field + +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.core.stream import Out +from dimos.msgs.audio_msgs.AudioStamped import AudioStamped +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class AudioConfig(ModuleConfig): + sample_rate: int = 16000 + channels: int = 1 + frame_ms: int = 20 + sample_format: str = "S16LE" + device: int | None = Field(default=None) + synthetic: bool = False + + +class AudioModule(Module): + """Publishes chunked PCM audio as AudioStamped messages. + + Streams: + audio (Out[AudioStamped]): one message per frame_ms chunk. + + RPC: + start() / stop() + + Skills: + record_clip(seconds) -> bytes raw S16LE PCM bytes + + io() matches CameraModule shape: single typed Out stream, @rpc start/stop. + """ + + config: AudioConfig + audio: Out[AudioStamped] + + # ------------------------------------------------------------------ + # Lifecycle (async def main = one yield: open before, close after) + # ------------------------------------------------------------------ + + async def main(self) -> None: # type: ignore[override] + """Open audio device (or start synthetic generator) before yield; close after.""" + frame_size = int(self.config.sample_rate * self.config.frame_ms / 1000) + + if self.config.synthetic: + task = asyncio.create_task(self._synth_loop(frame_size)) + logger.info("AudioModule: synthetic sine-tone source started") + yield + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + logger.info("AudioModule: synthetic source stopped") + else: + import sounddevice as sd # type: ignore[import-untyped] + + stream = sd.InputStream( + device=self.config.device, + samplerate=self.config.sample_rate, + channels=self.config.channels, + blocksize=frame_size, + dtype="int16", + callback=self._sd_callback, + ) + stream.start() + logger.info( + "AudioModule: sounddevice stream started " + f"({self.config.sample_rate}Hz, {self.config.channels}ch, " + f"{self.config.frame_ms}ms frames)" + ) + yield + stream.stop() + stream.close() + logger.info("AudioModule: sounddevice stream closed") + + # ------------------------------------------------------------------ + # RPC + # ------------------------------------------------------------------ + + @rpc + def start(self) -> None: + super().start() + + @rpc + def stop(self) -> None: + super().stop() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _sd_callback( + self, + indata: np.ndarray, + frames: int, + time_info: object, + status: object, + ) -> None: + if status: + logger.warning(f"AudioModule sounddevice status: {status}") + msg = AudioStamped.from_pcm( + pcm_bytes=indata.tobytes(), + sample_rate=self.config.sample_rate, + channels=self.config.channels, + sample_format=self.config.sample_format, + coding_format="pcm", + ts=time.monotonic(), + ) + self.audio.publish(msg) + + async def _synth_loop(self, frame_size: int) -> None: + """Generates a 440 Hz sine tone at ~frame_ms cadence.""" + interval = self.config.frame_ms / 1000.0 + freq = 440.0 + phase = 0 + amplitude = 0.3 * 32767 # ~-10 dBFS in int16 + + while True: + t_start = time.monotonic() + + t = (np.arange(frame_size) + phase) / self.config.sample_rate + tone = (np.sin(2 * np.pi * freq * t) * amplitude).astype(np.int16) + phase = (phase + frame_size) % self.config.sample_rate + + if self.config.channels > 1: + pcm = np.column_stack([tone] * self.config.channels) + else: + pcm = tone + + msg = AudioStamped.from_pcm( + pcm_bytes=pcm.tobytes(), + sample_rate=self.config.sample_rate, + channels=self.config.channels, + sample_format=self.config.sample_format, + coding_format="pcm", + ts=time.monotonic(), + ) + self.audio.publish(msg) + + elapsed = time.monotonic() - t_start + await asyncio.sleep(max(0.0, interval - elapsed)) + + # ------------------------------------------------------------------ + # Skills + # ------------------------------------------------------------------ + + @skill + def record_clip(self, seconds: float = 1.0) -> bytes: + """Record and return a clip of raw PCM audio. + + Collects frames from the live audio stream for `seconds` seconds and + returns them concatenated as raw S16LE PCM bytes. + """ + import threading + + buf: list[bytes] = [] + done = threading.Event() + collected = [0.0] + + def on_frame(msg: AudioStamped) -> None: + buf.append(msg.data) + collected[0] += self.config.frame_ms / 1000.0 + if collected[0] >= seconds: + done.set() + + unsub = self.audio.subscribe(on_frame) + done.wait(timeout=seconds + 2.0) + unsub() + return b"".join(buf) diff --git a/dimos/msgs/audio_msgs/AudioStamped.py b/dimos/msgs/audio_msgs/AudioStamped.py new file mode 100644 index 0000000000..c4e54a9092 --- /dev/null +++ b/dimos/msgs/audio_msgs/AudioStamped.py @@ -0,0 +1,168 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Python overlay for audio messages, mirroring ROS audio_common AudioStamped. + +LCM wire type: foxglove_msgs.RawAudio (already in dimos_lcm). + +Differences from ROS audio_common: + - Wire type uses builtin_interfaces.Time (not std_msgs.Header), so frame_id + and seq are NOT preserved through lcm_encode/lcm_decode. + - Wire type has a single 'format' string; we encode it as + "{coding_format}/{sample_format}" (e.g. "pcm/S16LE"). + - AudioInfo and AudioData are not separate LCM sub-types; they are plain + Python fields on this class. +""" + +from __future__ import annotations + +import time + +from dimos_lcm.builtin_interfaces import Time as LCMTime +from dimos_lcm.foxglove_msgs import RawAudio +import numpy as np + +from dimos.msgs.std_msgs.Header import Header +from dimos.types.timestamped import Timestamped + + +class AudioStamped(Timestamped): + """Stamped audio chunk with PCM payload. + + Carries one frame of raw PCM audio together with a std_msgs.Header and + audio metadata. Serialises to/from foxglove_msgs.RawAudio on the wire. + """ + + msg_name = "foxglove_msgs.RawAudio" # wire type used for LCM + + def __init__( + self, + header: Header, + sample_rate: int, + channels: int, + sample_format: str, + coding_format: str, + data: bytes, + ) -> None: + super().__init__(ts=header.timestamp) + self.header = header + self.sample_rate = sample_rate + self.channels = channels + self.sample_format = sample_format + self.coding_format = coding_format + self.data = data + + # ------------------------------------------------------------------ + # Factory helpers + # ------------------------------------------------------------------ + + @classmethod + def from_pcm( + cls, + pcm_bytes: bytes, + sample_rate: int = 16000, + channels: int = 1, + sample_format: str = "S16LE", + coding_format: str = "pcm", + frame_id: str = "", + ts: float | None = None, + ) -> AudioStamped: + """Construct from raw PCM bytes.""" + t = ts if ts is not None else time.monotonic() + header = Header(t, frame_id) + return cls( + header=header, + sample_rate=sample_rate, + channels=channels, + sample_format=sample_format, + coding_format=coding_format, + data=pcm_bytes, + ) + + # ------------------------------------------------------------------ + # Conversion helpers + # ------------------------------------------------------------------ + + def to_numpy(self) -> np.ndarray: + """Decode PCM bytes to a numpy array. + + Returns shape (n_samples,) for mono or (n_samples, channels) for + multi-channel. dtype is int16 for S16LE, float32 for F32LE. + """ + if self.sample_format == "S16LE": + dtype = np.dtype(" 1: + arr = arr.reshape(-1, self.channels) + return arr + + # ------------------------------------------------------------------ + # LCM serialisation + # ------------------------------------------------------------------ + + def lcm_encode(self) -> bytes: + """Encode to foxglove_msgs.RawAudio wire bytes. + + NOTE: frame_id and seq from self.header are NOT preserved (the wire + type only carries a bare timestamp, not a full std_msgs.Header). + """ + msg = RawAudio() + sec = int(self.ts) + nsec = int((self.ts - sec) * 1_000_000_000) + msg.timestamp = LCMTime(sec=sec, nanosec=nsec) + msg.format = f"{self.coding_format}/{self.sample_format}" + msg.sample_rate = self.sample_rate + msg.number_of_channels = self.channels + msg.data_length = len(self.data) + msg.data = self.data + return msg.lcm_encode() # type: ignore[no-any-return] + + @classmethod + def lcm_decode(cls, raw: bytes) -> AudioStamped: + """Decode from foxglove_msgs.RawAudio wire bytes.""" + msg = RawAudio.lcm_decode(raw) + + ts = msg.timestamp.sec + msg.timestamp.nanosec / 1_000_000_000 + + fmt = msg.format # e.g. "pcm/S16LE" + if "/" in fmt: + coding_format, sample_format = fmt.split("/", 1) + else: + coding_format, sample_format = "pcm", fmt + + return cls.from_pcm( + pcm_bytes=bytes(msg.data), + sample_rate=msg.sample_rate, + channels=msg.number_of_channels, + sample_format=sample_format, + coding_format=coding_format, + frame_id="", # not stored on wire + ts=ts, + ) + + # ------------------------------------------------------------------ + # Repr + # ------------------------------------------------------------------ + + def __repr__(self) -> str: + n_samples = len(self.data) // (2 if "16" in self.sample_format else 4) + return ( + f"AudioStamped(rate={self.sample_rate}, ch={self.channels}, " + f"fmt={self.sample_format}, samples={n_samples}, ts={self.ts:.6f})" + ) diff --git a/dimos/msgs/audio_msgs/__init__.py b/dimos/msgs/audio_msgs/__init__.py new file mode 100644 index 0000000000..12893217f8 --- /dev/null +++ b/dimos/msgs/audio_msgs/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .AudioStamped import AudioStamped as AudioStamped diff --git a/examples/audio/validate_audio_module.py b/examples/audio/validate_audio_module.py new file mode 100644 index 0000000000..c29136f679 --- /dev/null +++ b/examples/audio/validate_audio_module.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Validate AudioModule: construct → start → subscribe → sleep → stop. + +Usage: + # Synthetic (no mic needed): + python examples/audio/validate_audio_module.py --synthetic + + # Real microphone (macOS: grant mic permission on first run): + python examples/audio/validate_audio_module.py +""" + +import argparse +import time + +import numpy as np + +from dimos.hardware.sensors.audio.module import AudioModule +from dimos.msgs.audio_msgs.AudioStamped import AudioStamped + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--synthetic", action="store_true", default=True, + help="Use sine-tone source instead of mic (default: True)") + parser.add_argument("--real-mic", dest="synthetic", action="store_false", + help="Use real microphone input") + parser.add_argument("--duration", type=float, default=2.0, + help="Seconds to collect audio (default: 2.0)") + parser.add_argument("--sample-rate", type=int, default=16000) + parser.add_argument("--channels", type=int, default=1) + parser.add_argument("--frame-ms", type=int, default=20) + args = parser.parse_args() + + # ------------------------------------------------------------------ # + # 1. LCM round-trip smoke test (before any hardware) # + # ------------------------------------------------------------------ # + print("=== LCM round-trip test ===") + n_samples = args.sample_rate * args.frame_ms // 1000 + pcm_orig = (np.random.randn(n_samples).clip(-1, 1) * 32767).astype(np.int16) + orig = AudioStamped.from_pcm( + pcm_bytes=pcm_orig.tobytes(), + sample_rate=args.sample_rate, + channels=args.channels, + sample_format="S16LE", + coding_format="pcm", + ts=time.monotonic(), + ) + wire = orig.lcm_encode() + decoded = AudioStamped.lcm_decode(wire) + + assert decoded.sample_rate == orig.sample_rate, "sample_rate mismatch" + assert decoded.channels == orig.channels, "channels mismatch" + assert decoded.sample_format == orig.sample_format, "sample_format mismatch" + assert decoded.coding_format == orig.coding_format, "coding_format mismatch" + assert decoded.data == orig.data, "PCM data mismatch" + assert abs(decoded.ts - orig.ts) < 1e-6, "timestamp mismatch" + print(f" wire bytes : {len(wire)}") + print(f" orig : {orig}") + print(f" decoded : {decoded}") + print(" PASS\n") + + # ------------------------------------------------------------------ # + # 2. Construct + start AudioModule # + # ------------------------------------------------------------------ # + source = "synthetic sine-tone" if args.synthetic else "real microphone" + print(f"=== AudioModule ({source}) ===") + print(f" sample_rate={args.sample_rate}, channels={args.channels}, " + f"frame_ms={args.frame_ms}, duration={args.duration}s") + + mod = AudioModule( + sample_rate=args.sample_rate, + channels=args.channels, + frame_ms=args.frame_ms, + sample_format="S16LE", + synthetic=args.synthetic, + ) + + # Print io() shape (mirrors CameraModule) + print("\n--- io() ---") + print(mod.io(color=False)) + print("------------\n") + + # ------------------------------------------------------------------ # + # 3. Subscribe and collect # + # ------------------------------------------------------------------ # + counts = [0] + last_ts = [None] + gaps: list[float] = [] + + def on_audio(msg: AudioStamped) -> None: + counts[0] += 1 + arr = msg.to_numpy() + ts = msg.ts + if last_ts[0] is not None: + gaps.append(ts - last_ts[0]) + last_ts[0] = ts + + if counts[0] <= 5 or counts[0] % 10 == 0: + print( + f" [{counts[0]:4d}] rate={msg.sample_rate} ch={msg.channels} " + f"shape={arr.shape} dtype={arr.dtype} ts={ts:.6f}" + ) + + unsub_fn = mod.audio.subscribe(on_audio) + mod.start() + + time.sleep(args.duration) + + mod.stop() + unsub_fn() + + # ------------------------------------------------------------------ # + # 4. Report # + # ------------------------------------------------------------------ # + print("\n=== Results ===") + expected_rate = 1000.0 / args.frame_ms + actual_rate = counts[0] / args.duration if args.duration > 0 else 0.0 + avg_gap_ms = (sum(gaps) / len(gaps) * 1000) if gaps else 0.0 + + print(f" frames received : {counts[0]}") + print(f" expected rate : {expected_rate:.1f} Hz") + print(f" actual rate : {actual_rate:.1f} Hz") + print(f" avg frame gap : {avg_gap_ms:.2f} ms (expected {args.frame_ms} ms)") + + # Check monotonic timestamps + if len(gaps) > 0: + assert all(g > 0 for g in gaps), "timestamps are not monotonically increasing!" + print(" timestamps : monotonically increasing ✓") + + assert counts[0] > 0, "No audio frames received!" + print("\nDONE — AudioModule validated successfully.") + + +if __name__ == "__main__": + main() From c5fe59bb7f29f930f8d35296e35301d0f917d9ee Mon Sep 17 00:00:00 2001 From: GuoZi Date: Wed, 17 Jun 2026 10:34:05 +0800 Subject: [PATCH 02/13] fix(audio): correct AudioStamped docstring per tech lead review - Remove all mentions of `seq` (ROS2 std_msgs/Header has no seq field) - Reword frame_id note: dropped because RawAudio has no frame_id field on the wire, not by design choice - Mark foxglove_msgs.RawAudio as a temporary stand-in pending team decision on a native Header-bearing LCM type Co-Authored-By: Claude Sonnet 4.6 --- dimos/msgs/audio_msgs/AudioStamped.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/dimos/msgs/audio_msgs/AudioStamped.py b/dimos/msgs/audio_msgs/AudioStamped.py index c4e54a9092..2e13d87eb6 100644 --- a/dimos/msgs/audio_msgs/AudioStamped.py +++ b/dimos/msgs/audio_msgs/AudioStamped.py @@ -14,11 +14,15 @@ """Python overlay for audio messages, mirroring ROS audio_common AudioStamped. -LCM wire type: foxglove_msgs.RawAudio (already in dimos_lcm). +LCM wire type: foxglove_msgs.RawAudio — reused as a temporary stand-in because +it is the only audio type currently mirrored in dimos_lcm. This is NOT an +endorsement of the foxglove schema; a native type that carries a full +std_msgs.Header is pending a team decision. Differences from ROS audio_common: - - Wire type uses builtin_interfaces.Time (not std_msgs.Header), so frame_id - and seq are NOT preserved through lcm_encode/lcm_decode. + - Wire type carries only a builtin_interfaces.Time timestamp; it has no + frame_id field, so frame_id is lost on encode (not a design choice — there + is simply nowhere to put it in the RawAudio schema). - Wire type has a single 'format' string; we encode it as "{coding_format}/{sample_format}" (e.g. "pcm/S16LE"). - AudioInfo and AudioData are not separate LCM sub-types; they are plain @@ -41,7 +45,8 @@ class AudioStamped(Timestamped): """Stamped audio chunk with PCM payload. Carries one frame of raw PCM audio together with a std_msgs.Header and - audio metadata. Serialises to/from foxglove_msgs.RawAudio on the wire. + audio metadata. Serialises to/from foxglove_msgs.RawAudio on the wire + (temporary stand-in — see module docstring). """ msg_name = "foxglove_msgs.RawAudio" # wire type used for LCM @@ -119,8 +124,8 @@ def to_numpy(self) -> np.ndarray: def lcm_encode(self) -> bytes: """Encode to foxglove_msgs.RawAudio wire bytes. - NOTE: frame_id and seq from self.header are NOT preserved (the wire - type only carries a bare timestamp, not a full std_msgs.Header). + NOTE: frame_id from self.header is not preserved — RawAudio carries + only a builtin_interfaces.Time timestamp and has no frame_id field. """ msg = RawAudio() sec = int(self.ts) From 459ab1f40e07ae938dbc2f8dd9cb5d38b98fc72d Mon Sep 17 00:00:00 2001 From: GuoZi Date: Wed, 17 Jun 2026 14:01:14 +0800 Subject: [PATCH 03/13] feat(audio): register AudioModule in dimos CLI Add demo_audio blueprint to module.py and regenerate all_blueprints.py so AudioModule is accessible via: dimos run demo-audio (blueprint) dimos run audio-module (standalone module) Co-Authored-By: Claude Sonnet 4.6 --- dimos/hardware/sensors/audio/module.py | 4 ++++ dimos/robot/all_blueprints.py | 29 ++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index 855b6faf6c..81c6bdf24c 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -33,6 +33,7 @@ from pydantic import Field from dimos.agents.annotation import skill +from dimos.core.coordination.blueprints import autoconnect from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import Out @@ -203,3 +204,6 @@ def on_frame(msg: AudioStamped) -> None: done.wait(timeout=seconds + 2.0) unsub() return b"".join(buf) + + +demo_audio = autoconnect(AudioModule.blueprint()) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 6fbf0138bb..6008ce568d 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -45,11 +45,12 @@ "coordinator-xarm7": "dimos.control.blueprints.basic:coordinator_xarm7", "demo-agent": "dimos.agents.demo_agent:demo_agent", "demo-agent-camera": "dimos.agents.demo_agent:demo_agent_camera", + "demo-audio": "dimos.hardware.sensors.audio.module:demo_audio", "demo-camera": "dimos.hardware.sensors.camera.module:demo_camera", + "demo-capabilities": "dimos.agents.demos.demo_capabilities:demo_capabilities", "demo-error-on-name-conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts:demo_error_on_name_conflicts", "demo-google-maps-skill": "dimos.agents.skills.demo_google_maps_skill:demo_google_maps_skill", "demo-gps-nav": "dimos.agents.skills.demo_gps_nav:demo_gps_nav", - "demo-grasping": "dimos.manipulation.grasping.demo_grasping:demo_grasping", "demo-mcp-stress-test": "dimos.core.tests.stress_test_blueprint:demo_mcp_stress_test", "demo-object-scene-registration": "dimos.perception.demo_object_scene_registration:demo_object_scene_registration", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", @@ -71,15 +72,19 @@ "mid360-fastlio-voxels-native": "dimos.hardware.sensors.lidar.fastlio2.fastlio_blueprints:mid360_fastlio_voxels_native", "openarm-mock-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_mock_planner_coordinator", "openarm-planner-coordinator": "dimos.robot.manipulators.openarm.blueprints:openarm_planner_coordinator", + "path-planner-eval": "dimos.navigation.nav_3d.evaluator.blueprints:path_planner_eval", + "teleop-hosted-go2": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_go2", + "teleop-hosted-xarm7": "dimos.teleop.quest_hosted.blueprints:teleop_hosted_xarm7", "teleop-phone": "dimos.teleop.phone.blueprints:teleop_phone", "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", "teleop-phone-go2-fleet": "dimos.teleop.phone.blueprints:teleop_phone_go2_fleet", "teleop-quest-dual": "dimos.teleop.quest.blueprints:teleop_quest_dual", + "teleop-quest-go2": "dimos.teleop.quest.blueprints:teleop_quest_go2", "teleop-quest-piper": "dimos.teleop.quest.blueprints:teleop_quest_piper", "teleop-quest-rerun": "dimos.teleop.quest.blueprints:teleop_quest_rerun", "teleop-quest-xarm6": "dimos.teleop.quest.blueprints:teleop_quest_xarm6", "teleop-quest-xarm7": "dimos.teleop.quest.blueprints:teleop_quest_xarm7", - "uintree-g1-primitive-no-nav": "dimos.robot.unitree.g1.blueprints.primitive.uintree_g1_primitive_no_nav:uintree_g1_primitive_no_nav", + "teleop-quest-xarm7-video": "dimos.teleop.quest.blueprints:teleop_quest_xarm7_video", "unitree-g1": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1:unitree_g1", "unitree-g1-agentic": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_agentic:unitree_g1_agentic", "unitree-g1-agentic-sim": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_agentic_sim:unitree_g1_agentic_sim", @@ -88,9 +93,12 @@ "unitree-g1-coordinator": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_coordinator:unitree_g1_coordinator", "unitree-g1-detection": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_detection:unitree_g1_detection", "unitree-g1-full": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_full:unitree_g1_full", + "unitree-g1-groot-wbc": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_groot_wbc:unitree_g1_groot_wbc", "unitree-g1-joystick": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_joystick:unitree_g1_joystick", "unitree-g1-nav-onboard": "dimos.robot.unitree.g1.blueprints.navigation.unitree_g1_nav_onboard:unitree_g1_nav_onboard", "unitree-g1-nav-sim": "dimos.robot.unitree.g1.blueprints.navigation.unitree_g1_nav_sim:unitree_g1_nav_sim", + "unitree-g1-nav-simple": "dimos.robot.unitree.g1.blueprints.navigation.unitree_g1_nav_simple:unitree_g1_nav_simple", + "unitree-g1-primitive-no-nav": "dimos.robot.unitree.g1.blueprints.primitive.unitree_g1_primitive_no_nav:unitree_g1_primitive_no_nav", "unitree-g1-shm": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_shm:unitree_g1_shm", "unitree-g1-sim": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_sim:unitree_g1_sim", "unitree-go2": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2:unitree_go2", @@ -126,22 +134,27 @@ all_modules = { "alfred-high-level": "dimos.robot.diy.alfred.effector_high_level.AlfredHighLevel", "arm-teleop-module": "dimos.teleop.quest.quest_extensions.ArmTeleopModule", + "audio-module": "dimos.hardware.sensors.audio.module.AudioModule", "b-box-navigation-module": "dimos.navigation.bbox_navigation.BBoxNavigationModule", "b1-connection-module": "dimos.robot.unitree.b1.connection.B1ConnectionModule", "camera-module": "dimos.hardware.sensors.camera.module.CameraModule", "cartesian-motion-controller": "dimos.manipulation.control.servo_control.cartesian_motion_controller.CartesianMotionController", + "click-start-goal-router": "dimos.navigation.nav_stack.modules.click_start_goal_router.click_start_goal_router.ClickStartGoalRouter", "control-coordinator": "dimos.control.coordinator.ControlCoordinator", "cost-mapper": "dimos.mapping.costmapper.CostMapper", "demo-calculator-skill": "dimos.agents.skills.demo_calculator_skill.DemoCalculatorSkill", + "demo-monitoring": "dimos.agents.demos.demo_capabilities.DemoMonitoring", "demo-robot": "dimos.agents.skills.demo_robot.DemoRobot", + "demo-robot-actions": "dimos.agents.demos.demo_capabilities.DemoRobotActions", + "demo-sensors": "dimos.agents.demos.demo_capabilities.DemoSensors", "desk-static-tf-module": "dimos.perception.fiducial.blueprints.desk_marker_tf.DeskStaticTfModule", "detection2-d-module": "dimos.perception.detection.module2D.Detection2DModule", "detection3-d-module": "dimos.perception.detection.module3D.Detection3DModule", "drone-camera-module": "dimos.robot.drone.camera_module.DroneCameraModule", "drone-connection-module": "dimos.robot.drone.connection_module.DroneConnectionModule", "drone-tracking-module": "dimos.robot.drone.drone_tracking_module.DroneTrackingModule", - "embedding-memory": "dimos.memory.embedding.EmbeddingMemory", "emitter-module": "dimos.utils.demo_image_encoding.EmitterModule", + "evaluator": "dimos.navigation.nav_3d.evaluator.evaluator.Evaluator", "far-planner": "dimos.navigation.nav_stack.modules.far_planner.far_planner.FarPlanner", "fast-lio2": "dimos.hardware.sensors.lidar.fastlio2.module.FastLio2", "g1-connection": "dimos.robot.unitree.g1.connection.G1Connection", @@ -153,11 +166,15 @@ "go2-connection": "dimos.robot.unitree.go2.connection.GO2Connection", "go2-fleet-connection": "dimos.robot.unitree.go2.fleet_connection.Go2FleetConnection", "go2-memory": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2.Go2Memory", + "go2-teleop-module": "dimos.teleop.quest.quest_extensions.Go2TeleopModule", "google-maps-skill-container": "dimos.agents.skills.google_maps_skill_container.GoogleMapsSkillContainer", "gps-nav-skill-container": "dimos.agents.skills.gps_nav_skill.GpsNavSkillContainer", - "grasp-gen-module": "dimos.manipulation.grasping.graspgen_module.GraspGenModule", "grasping-module": "dimos.manipulation.grasping.grasping.GraspingModule", "gstreamer-camera-module": "dimos.hardware.sensors.camera.gstreamer.gstreamer_camera.GstreamerCameraModule", + "hosted-arm-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedArmTeleopModule", + "hosted-teleop-module": "dimos.teleop.quest_hosted.hosted_teleop_module.HostedTeleopModule", + "hosted-teleop-recorder": "dimos.teleop.quest_hosted.blueprints.HostedTeleopRecorder", + "hosted-twist-teleop-module": "dimos.teleop.quest_hosted.hosted_extensions.HostedTwistTeleopModule", "joint-trajectory-controller": "dimos.manipulation.control.trajectory_controller.joint_trajectory_controller.JointTrajectoryController", "joystick-module": "dimos.robot.unitree.b1.joystick_module.JoystickModule", "keyboard-teleop": "dimos.robot.unitree.keyboard_teleop.KeyboardTeleop", @@ -165,10 +182,12 @@ "local-planner": "dimos.navigation.nav_stack.modules.local_planner.local_planner.LocalPlanner", "manipulation-module": "dimos.manipulation.manipulation_module.ManipulationModule", "map": "dimos.robot.unitree.type.map.Map", + "marker-detection-stream-module": "dimos.perception.fiducial.marker_detection_stream_module.MarkerDetectionStreamModule", "marker-tf-module": "dimos.perception.fiducial.marker_tf_module.MarkerTfModule", "mcp-client": "dimos.agents.mcp.mcp_client.McpClient", "mcp-server": "dimos.agents.mcp.mcp_server.McpServer", "memory-module": "dimos.memory2.module.MemoryModule", + "mls-planner-native": "dimos.navigation.nav_3d.mls_planner.mls_planner_native.MLSPlannerNative", "mock-b1-connection-module": "dimos.robot.unitree.b1.connection.MockB1ConnectionModule", "module-a": "dimos.robot.unitree.demo_error_on_name_conflicts.ModuleA", "module-b": "dimos.robot.unitree.demo_error_on_name_conflicts.ModuleB", @@ -207,6 +226,7 @@ "spatial-memory": "dimos.perception.spatial_perception.SpatialMemory", "speak-skill": "dimos.agents.skills.speak_skill.SpeakSkill", "tare-planner": "dimos.navigation.nav_stack.modules.tare_planner.tare_planner.TarePlanner", + "teleop-recorder": "dimos.teleop.utils.recorder.TeleopRecorder", "temporal-memory": "dimos.perception.experimental.temporal_memory.temporal_memory.TemporalMemory", "terrain-analysis": "dimos.navigation.nav_stack.modules.terrain_analysis.terrain_analysis.TerrainAnalysis", "terrain-map-ext": "dimos.navigation.nav_stack.modules.terrain_map_ext.terrain_map_ext.TerrainMapExt", @@ -214,6 +234,7 @@ "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container.UnitreeG1SkillContainer", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container.UnitreeSkillContainer", "unity-bridge-module": "dimos.simulation.unity.module.UnityBridgeModule", + "video-arm-teleop-module": "dimos.teleop.quest.quest_extensions.VideoArmTeleopModule", "vlm-agent": "dimos.agents.vlm_agent.VLMAgent", "vlm-stream-tester": "dimos.agents.vlm_stream_tester.VlmStreamTester", "voxel-grid-mapper": "dimos.mapping.voxels.VoxelGridMapper", From cbe841138c7c4a5ec4dbe659ed5839e79b35d89a Mon Sep 17 00:00:00 2001 From: GuoZi Date: Wed, 17 Jun 2026 16:23:58 +0800 Subject: [PATCH 04/13] feat(audio): add audio speech loopback pipeline --- dimos/hardware/sensors/audio/module.py | 511 ++++++++++++++++++++++++- dimos/robot/all_blueprints.py | 1 + 2 files changed, 510 insertions(+), 2 deletions(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index 81c6bdf24c..a0e705bd06 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -26,7 +26,10 @@ from __future__ import annotations +from collections.abc import Callable import asyncio +import queue +import threading import time import numpy as np @@ -36,7 +39,7 @@ from dimos.core.coordination.blueprints import autoconnect from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig -from dimos.core.stream import Out +from dimos.core.stream import In, Out from dimos.msgs.audio_msgs.AudioStamped import AudioStamped from dimos.utils.logging_config import setup_logger @@ -206,4 +209,508 @@ def on_frame(msg: AudioStamped) -> None: return b"".join(buf) -demo_audio = autoconnect(AudioModule.blueprint()) +class SpeakerConfig(ModuleConfig): + device: int | None = Field(default=None) + queue_max_chunks: int = 256 + + +class SpeakerModule(Module): + config: SpeakerConfig + audio: In[AudioStamped] + + _queue: queue.Queue[AudioStamped] | None = None + _running: threading.Event | None = None + _writer_thread: threading.Thread | None = None + _unsub: Callable[[], None] | None = None + _stream: object | None = None + _stream_lock: threading.Lock = threading.Lock() + _stream_rate: int | None = None + _stream_channels: int | None = None + _stream_dtype: str | None = None + + async def main(self) -> None: # type: ignore[override] + self._queue = queue.Queue(maxsize=self.config.queue_max_chunks) + self._running = threading.Event() + self._running.set() + self._writer_thread = threading.Thread(target=self._writer_loop, daemon=True) + self._writer_thread.start() + yield + if self._unsub is not None: + try: + self._unsub() + except Exception: + logger.exception("SpeakerModule failed to unsubscribe") + self._unsub = None + if self._running is not None: + self._running.clear() + if self._writer_thread is not None: + self._writer_thread.join(timeout=2.0) + self._writer_thread = None + with self._stream_lock: + if self._stream is not None: + try: + self._stream.stop() + except Exception: + logger.exception("SpeakerModule failed to stop output stream") + try: + self._stream.close() + except Exception: + logger.exception("SpeakerModule failed to close output stream") + self._stream = None + self._queue = None + self._running = None + self._stream_rate = None + self._stream_channels = None + self._stream_dtype = None + + @rpc + def start(self) -> None: + super().start() + self._unsub = self.audio.subscribe(self._on_audio) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_audio(self, msg: AudioStamped) -> None: + q = self._queue + if q is None: + return + try: + q.put_nowait(msg) + except queue.Full: + try: + _ = q.get_nowait() + except queue.Empty: + return + try: + q.put_nowait(msg) + except queue.Full: + return + + def _writer_loop(self) -> None: + running = self._running + q = self._queue + if running is None or q is None: + return + + try: + import sounddevice as sd # type: ignore[import-untyped] + except Exception as e: + logger.warning(f"SpeakerModule: sounddevice unavailable ({e}); dropping audio") + while running.is_set(): + try: + _ = q.get(timeout=0.25) + except queue.Empty: + continue + return + + while running.is_set(): + try: + msg = q.get(timeout=0.25) + except queue.Empty: + continue + + if msg.coding_format != "pcm": + continue + + if msg.sample_format == "S16LE": + dtype = " 1: + data = data.reshape(-1, msg.channels) + with self._stream_lock: + if self._stream is not None: + try: + self._stream.write(data) + except Exception: + logger.exception("SpeakerModule failed to write audio") + + +class SpeechToTextConfig(ModuleConfig): + model: str = "base" + language: str = "en" + fp16: bool = False + segment_seconds: float = 3.0 + queue_max_segments: int = 8 + + +class SpeechToTextModule(Module): + config: SpeechToTextConfig + audio: In[AudioStamped] + text: Out[str] + + _segment_queue: queue.Queue[tuple[np.ndarray, int]] | None = None + _running: threading.Event | None = None + _thread: threading.Thread | None = None + _unsub: Callable[[], None] | None = None + _buf: np.ndarray | None = None + _buf_rate: int | None = None + + async def main(self) -> None: # type: ignore[override] + self._segment_queue = queue.Queue(maxsize=self.config.queue_max_segments) + self._running = threading.Event() + self._running.set() + self._buf = np.zeros((0,), dtype=np.float32) + self._buf_rate = None + self._thread = threading.Thread(target=self._worker_loop, daemon=True) + self._thread.start() + yield + if self._unsub is not None: + try: + self._unsub() + except Exception: + logger.exception("SpeechToTextModule failed to unsubscribe") + self._unsub = None + if self._running is not None: + self._running.clear() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + self._segment_queue = None + self._running = None + self._buf = None + self._buf_rate = None + + @rpc + def start(self) -> None: + super().start() + self._unsub = self.audio.subscribe(self._on_audio) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_audio(self, msg: AudioStamped) -> None: + if msg.coding_format != "pcm": + return + if msg.sample_format == "S16LE": + x = msg.to_numpy().astype(np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + x = x / 32768.0 + elif msg.sample_format in ("F32LE", "F32"): + x = msg.to_numpy().astype(np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + else: + return + + if self._buf is None: + return + + if self._buf_rate is None: + self._buf_rate = msg.sample_rate + elif self._buf_rate != msg.sample_rate: + self._buf = np.zeros((0,), dtype=np.float32) + self._buf_rate = msg.sample_rate + + self._buf = np.concatenate([self._buf, x], axis=0) + rate = self._buf_rate + if rate is None: + return + target_samples = max(1, int(self.config.segment_seconds * rate)) + + while self._buf.shape[0] >= target_samples: + segment = self._buf[:target_samples] + self._buf = self._buf[target_samples:] + q = self._segment_queue + if q is None: + return + try: + q.put_nowait((segment, rate)) + except queue.Full: + try: + _ = q.get_nowait() + except queue.Empty: + break + try: + q.put_nowait((segment, rate)) + except queue.Full: + break + + def _worker_loop(self) -> None: + running = self._running + q = self._segment_queue + if running is None or q is None: + return + + model: object | None = None + use_faster = False + try: + import whisper # type: ignore[import-untyped] + + model = whisper.load_model(self.config.model) + use_faster = False + logger.info(f"SpeechToTextModule: ready (backend=openai-whisper, model={self.config.model})") + except Exception: + try: + from faster_whisper import WhisperModel # type: ignore[import-untyped] + + compute_type = "float16" if self.config.fp16 else "int8" + model = WhisperModel(self.config.model, device="auto", compute_type=compute_type) + use_faster = True + logger.info( + "SpeechToTextModule: ready " + f"(backend=faster-whisper, model={self.config.model}, fp16={self.config.fp16})" + ) + except Exception as e: + logger.warning(f"SpeechToTextModule: no whisper backend available ({e}); dropping audio") + while running.is_set(): + try: + _ = q.get(timeout=0.25) + except queue.Empty: + continue + return + + while running.is_set(): + try: + segment, _rate = q.get(timeout=0.25) + except queue.Empty: + continue + + try: + if use_faster: + segments, _info = model.transcribe(segment, language=self.config.language) # type: ignore[union-attr] + text = " ".join(seg.text.strip() for seg in segments).strip() + else: + result = model.transcribe( # type: ignore[union-attr] + segment, + language=self.config.language, + fp16=self.config.fp16, + ) + text = str(result.get("text", "")).strip() + except Exception: + logger.exception("SpeechToTextModule transcription failed") + continue + + if text: + preview = text if len(text) <= 120 else (text[:120] + "…") + logger.info(f"SpeechToTextModule: transcribed: {preview}") + self.text.publish(text) + + +class TextToSpeechConfig(ModuleConfig): + voice: str = "echo" + model: str = "tts-1" + speed: float = 1.0 + sample_rate: int = 24000 + frame_ms: int = 20 + api_key: str | None = Field(default=None) + queue_max_texts: int = 64 + + +class TextToSpeechModule(Module): + config: TextToSpeechConfig + text: In[str] + audio: Out[AudioStamped] + + _queue: queue.Queue[str] | None = None + _running: threading.Event | None = None + _thread: threading.Thread | None = None + _unsub: Callable[[], None] | None = None + + async def main(self) -> None: # type: ignore[override] + self._queue = queue.Queue(maxsize=self.config.queue_max_texts) + self._running = threading.Event() + self._running.set() + self._thread = threading.Thread(target=self._worker_loop, daemon=True) + self._thread.start() + yield + if self._unsub is not None: + try: + self._unsub() + except Exception: + logger.exception("TextToSpeechModule failed to unsubscribe") + self._unsub = None + if self._running is not None: + self._running.clear() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + self._queue = None + self._running = None + + @rpc + def start(self) -> None: + super().start() + self._unsub = self.text.subscribe(self._on_text) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_text(self, text: str) -> None: + if not text.strip(): + return + q = self._queue + if q is None: + return + try: + q.put_nowait(text) + except queue.Full: + try: + _ = q.get_nowait() + except queue.Empty: + return + try: + q.put_nowait(text) + except queue.Full: + return + + def _worker_loop(self) -> None: + running = self._running + q = self._queue + if running is None or q is None: + return + + try: + import io + + from openai import OpenAI + import soundfile as sf # type: ignore[import-untyped] + except Exception as e: + logger.warning(f"TextToSpeechModule: missing dependencies ({e}); dropping text") + while running.is_set(): + try: + _ = q.get(timeout=0.25) + except queue.Empty: + continue + return + + client = OpenAI(api_key=self.config.api_key) + frame_size = max(1, int(self.config.sample_rate * self.config.frame_ms / 1000)) + logger.info( + "TextToSpeechModule: ready " + f"(model={self.config.model}, voice={self.config.voice}, sample_rate={self.config.sample_rate})" + ) + + while running.is_set(): + try: + text = q.get(timeout=0.25) + except queue.Empty: + continue + + preview = text if len(text) <= 120 else (text[:120] + "…") + logger.info(f"TextToSpeechModule: synthesizing: {preview}") + try: + response = client.audio.speech.create( + model=self.config.model, + voice=self.config.voice, + input=text, + speed=self.config.speed, + ) + audio_data = io.BytesIO(response.content) + with sf.SoundFile(audio_data, "r") as sound_file: + src_rate = int(sound_file.samplerate) + samples = sound_file.read(dtype="float32") + except Exception: + logger.exception("TextToSpeechModule synthesis failed") + continue + + if samples.ndim > 1: + samples = samples.mean(axis=1) + + if src_rate != self.config.sample_rate: + x = np.asarray(samples, dtype=np.float32) + old_n = x.shape[0] + if old_n <= 1: + continue + new_n = int(old_n * self.config.sample_rate / src_rate) + old_idx = np.arange(old_n) + new_idx = np.linspace(0, old_n - 1, new_n) + samples = np.interp(new_idx, old_idx, x).astype(np.float32) + + pcm = np.clip(samples, -1.0, 1.0) + pcm_i16 = (pcm * 32767.0).astype(np.int16) + pcm_bytes = pcm_i16.tobytes() + + n_bytes_per_sample = 2 + chunk_bytes = frame_size * n_bytes_per_sample + for offset in range(0, len(pcm_bytes), chunk_bytes): + chunk = pcm_bytes[offset : offset + chunk_bytes] + self.audio.publish( + AudioStamped.from_pcm( + pcm_bytes=chunk, + sample_rate=self.config.sample_rate, + channels=1, + sample_format="S16LE", + coding_format="pcm", + ts=time.monotonic(), + ) + ) + logger.info( + "TextToSpeechModule: published audio " + f"({len(pcm_bytes) / (n_bytes_per_sample * self.config.sample_rate):.2f}s)" + ) + + +audio_speech_loopback = autoconnect( + AudioModule.blueprint(), + SpeechToTextModule.blueprint(), + TextToSpeechModule.blueprint(), + SpeakerModule.blueprint(), +).remappings( + [ + (AudioModule, "audio", "mic_audio"), + (SpeechToTextModule, "audio", "mic_audio"), + (SpeechToTextModule, "text", "speech_text"), + (TextToSpeechModule, "text", "speech_text"), + (TextToSpeechModule, "audio", "tts_audio"), + (SpeakerModule, "audio", "tts_audio"), + ] +) + + +demo_audio = autoconnect( + AudioModule.blueprint(), + SpeakerModule.blueprint(), +) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 6008ce568d..21f0038356 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -17,6 +17,7 @@ all_blueprints = { "alfred-nav": "dimos.robot.diy.alfred.blueprints.alfred_nav:alfred_nav", + "audio-speech-loopback": "dimos.hardware.sensors.audio.module:audio_speech_loopback", "coordinator-basic": "dimos.control.blueprints.basic:coordinator_basic", "coordinator-cartesian-ik-mock": "dimos.control.blueprints.teleop:coordinator_cartesian_ik_mock", "coordinator-cartesian-ik-piper": "dimos.control.blueprints.teleop:coordinator_cartesian_ik_piper", From 732e60c61978cb4e6c88b189380c9f2bb1062b95 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Wed, 17 Jun 2026 16:42:21 +0800 Subject: [PATCH 05/13] feat(audio): add fun voice effects chain --- dimos/hardware/sensors/audio/module.py | 400 ++++++++++++++++++++++++- 1 file changed, 399 insertions(+), 1 deletion(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index a0e705bd06..569e2f9981 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -28,6 +28,7 @@ from collections.abc import Callable import asyncio +import math import queue import threading import time @@ -693,10 +694,405 @@ def _worker_loop(self) -> None: ) +def _phase_vocoder(D: np.ndarray, rate: float, hop_length: int, n_fft: int) -> np.ndarray: + if D.shape[1] <= 1: + return D + time_steps = np.arange(0, D.shape[1] - 1, rate, dtype=np.float32) + out = np.empty((D.shape[0], len(time_steps)), dtype=np.complex64) + + phase_acc = np.angle(D[:, 0]).astype(np.float32) + phase_advance = (2.0 * np.pi * hop_length * np.arange(D.shape[0]) / n_fft).astype( + np.float32 + ) + + for t, step in enumerate(time_steps): + i = int(step) + frac = float(step - i) + mag = (1.0 - frac) * np.abs(D[:, i]) + frac * np.abs(D[:, i + 1]) + + phase = np.angle(D[:, i + 1]) - np.angle(D[:, i]) + phase = phase - phase_advance + phase = (phase + np.pi) % (2.0 * np.pi) - np.pi + + phase_acc = phase_acc + phase_advance + phase + out[:, t] = mag * np.exp(1j * phase_acc) + + return out + + +def _stft(y: np.ndarray, n_fft: int, hop_length: int, window: np.ndarray) -> np.ndarray: + if y.shape[0] < n_fft: + y = np.pad(y, (0, n_fft - y.shape[0])) + n_frames = 1 + (y.shape[0] - n_fft) // hop_length + frames = np.empty((n_fft // 2 + 1, n_frames), dtype=np.complex64) + for i in range(n_frames): + start = i * hop_length + frame = y[start : start + n_fft] * window + frames[:, i] = np.fft.rfft(frame, n=n_fft).astype(np.complex64) + return frames + + +def _istft(D: np.ndarray, n_fft: int, hop_length: int, window: np.ndarray, length: int) -> np.ndarray: + y = np.zeros((hop_length * (D.shape[1] - 1) + n_fft,), dtype=np.float32) + wsum = np.zeros_like(y) + for i in range(D.shape[1]): + start = i * hop_length + frame = np.fft.irfft(D[:, i], n=n_fft).astype(np.float32) + y[start : start + n_fft] += frame * window + wsum[start : start + n_fft] += window * window + nonzero = wsum > 1e-8 + y[nonzero] /= wsum[nonzero] + if y.shape[0] < length: + y = np.pad(y, (0, length - y.shape[0])) + return y[:length] + + +def _resample_linear(y: np.ndarray, n: int) -> np.ndarray: + if n <= 0: + return np.zeros((0,), dtype=np.float32) + if y.shape[0] == 0: + return np.zeros((n,), dtype=np.float32) + if y.shape[0] == 1: + return np.full((n,), float(y[0]), dtype=np.float32) + xp = np.arange(y.shape[0], dtype=np.float32) + x = np.linspace(0.0, float(y.shape[0] - 1), n, dtype=np.float32) + return np.interp(x, xp, y).astype(np.float32) + + +def _pitch_shift_block(y: np.ndarray, sample_rate: int, semitones: float) -> np.ndarray: + if semitones == 0.0: + return y + n_fft = 1024 + hop = 256 + p = 2.0 ** (semitones / 12.0) + rate = 1.0 / p + window = np.hanning(n_fft).astype(np.float32) + + D = _stft(y, n_fft=n_fft, hop_length=hop, window=window) + D_stretch = _phase_vocoder(D, rate=rate, hop_length=hop, n_fft=n_fft) + stretched_len = max(1, int(round(y.shape[0] * rate))) + y_stretch = _istft(D_stretch, n_fft=n_fft, hop_length=hop, window=window, length=stretched_len) + y_shift = _resample_linear(y_stretch, y.shape[0]) + return y_shift + + +class FunVoiceEffectsConfig(ModuleConfig): + enabled: bool = True + input_gain: float = 1.0 + + noise_gate_enabled: bool = True + noise_gate_threshold_db: float = -35.0 + noise_gate_attack_ms: float = 10.0 + noise_gate_release_ms: float = 120.0 + + pitch_shift_enabled: bool = True + pitch_semitones: float = 7.0 + + robotize_enabled: bool = True + ringmod_hz: float = 45.0 + bitcrush_bits: int = 8 + bitcrush_downsample: int = 4 + + echo_enabled: bool = True + echo_delay_ms: float = 160.0 + echo_feedback: float = 0.35 + echo_mix: float = 0.25 + + block_ms: float = 160.0 + vu_log_interval_s: float = 0.75 + + +class _FunVoiceProcessor: + def __init__(self) -> None: + self.sample_rate: int | None = None + self.input_gain: float = 1.0 + self.gate_gain: float = 1.0 + self.gate_attack: float = 0.0 + self.gate_release: float = 0.0 + self.gate_threshold: float = 0.0 + self.pitch_semitones: float = 0.0 + self.ringmod_hz: float = 0.0 + self.ring_phase: float = 0.0 + self.bit_bits: int = 8 + self.bit_downsample: int = 1 + self.bit_hold: float = 0.0 + self.bit_count: int = 0 + self.echo_delay: int = 0 + self.echo_feedback: float = 0.0 + self.echo_mix: float = 0.0 + self.echo_buf: np.ndarray | None = None + self.echo_idx: int = 0 + self.block_size: int = 0 + self.hop_size: int = 0 + self.window: np.ndarray | None = None + self.prev_overlap: np.ndarray | None = None + self.inbuf = np.zeros((0,), dtype=np.float32) + self.outbuf = np.zeros((0,), dtype=np.float32) + self.last_vu_log = 0.0 + + def reconfigure(self, cfg: FunVoiceEffectsConfig, sample_rate: int) -> None: + self.sample_rate = sample_rate + self.input_gain = float(cfg.input_gain) + + th = 10.0 ** (float(cfg.noise_gate_threshold_db) / 20.0) + self.gate_threshold = float(th) + self.gate_attack = max(1e-6, float(cfg.noise_gate_attack_ms) / 1000.0) + self.gate_release = max(1e-6, float(cfg.noise_gate_release_ms) / 1000.0) + + self.pitch_semitones = float(cfg.pitch_semitones) if cfg.pitch_shift_enabled else 0.0 + self.ringmod_hz = float(cfg.ringmod_hz) if cfg.robotize_enabled else 0.0 + self.bit_bits = int(cfg.bitcrush_bits) + self.bit_downsample = max(1, int(cfg.bitcrush_downsample)) + + self.echo_feedback = float(cfg.echo_feedback) if cfg.echo_enabled else 0.0 + self.echo_mix = float(cfg.echo_mix) if cfg.echo_enabled else 0.0 + self.echo_delay = int(round(float(cfg.echo_delay_ms) * sample_rate / 1000.0)) + if self.echo_delay > 0 and self.echo_mix > 0.0: + self.echo_buf = np.zeros((self.echo_delay,), dtype=np.float32) + self.echo_idx = 0 + else: + self.echo_buf = None + self.echo_idx = 0 + + self.block_size = max(1024, int(round(float(cfg.block_ms) * sample_rate / 1000.0))) + self.block_size = int(2 ** math.ceil(math.log2(self.block_size))) + self.hop_size = self.block_size // 2 + self.window = np.sqrt(np.hanning(self.block_size).astype(np.float32)) + self.prev_overlap = np.zeros((self.hop_size,), dtype=np.float32) + + self.inbuf = np.zeros((0,), dtype=np.float32) + self.outbuf = np.zeros((0,), dtype=np.float32) + self.ring_phase = 0.0 + self.bit_hold = 0.0 + self.bit_count = 0 + self.gate_gain = 1.0 + self.last_vu_log = 0.0 + + def push(self, x: np.ndarray) -> None: + self.inbuf = np.concatenate([self.inbuf, x.astype(np.float32, copy=False)], axis=0) + while self.inbuf.shape[0] >= self.block_size: + block = self.inbuf[: self.block_size] + self.inbuf = self.inbuf[self.hop_size :] + y = self._process_block(block) + w = self.window + prev = self.prev_overlap + if w is None or prev is None: + self.outbuf = np.concatenate([self.outbuf, y[: self.hop_size]], axis=0) + else: + yw = y * w + out = yw[: self.hop_size] + prev + self.prev_overlap = yw[self.hop_size :] + self.outbuf = np.concatenate([self.outbuf, out], axis=0) + + def pop(self, n: int) -> np.ndarray: + if n <= 0: + return np.zeros((0,), dtype=np.float32) + if self.outbuf.shape[0] < n: + out = np.zeros((n,), dtype=np.float32) + if self.outbuf.shape[0] > 0: + out[: self.outbuf.shape[0]] = self.outbuf + self.outbuf = np.zeros((0,), dtype=np.float32) + return out + out = self.outbuf[:n] + self.outbuf = self.outbuf[n:] + return out + + def _apply_gate(self, y: np.ndarray) -> np.ndarray: + rms = float(np.sqrt(np.mean(y * y) + 1e-12)) + target = 0.0 if rms < self.gate_threshold else 1.0 + sr = float(self.sample_rate or 1) + if target > self.gate_gain: + step = 1.0 / max(1.0, self.gate_attack * sr) + self.gate_gain = min(1.0, self.gate_gain + step * y.shape[0]) + else: + step = 1.0 / max(1.0, self.gate_release * sr) + self.gate_gain = max(0.0, self.gate_gain - step * y.shape[0]) + return y * self.gate_gain + + def _apply_ringmod(self, y: np.ndarray) -> np.ndarray: + if self.ringmod_hz <= 0.0: + return y + sr = float(self.sample_rate or 1) + phase_inc = 2.0 * np.pi * self.ringmod_hz / sr + idx = np.arange(y.shape[0], dtype=np.float32) + ph = self.ring_phase + phase_inc * idx + mod = np.sin(ph).astype(np.float32) + self.ring_phase = float((ph[-1] + phase_inc) % (2.0 * np.pi)) + return y * mod + + def _apply_bitcrush(self, y: np.ndarray) -> np.ndarray: + bits = int(self.bit_bits) + if bits >= 16 and self.bit_downsample <= 1: + return y + levels = float(2 ** max(1, bits - 1)) + out = np.empty_like(y) + for i, s in enumerate(y): + if self.bit_count <= 0: + q = np.round(s * levels) / levels + self.bit_hold = float(q) + self.bit_count = self.bit_downsample + out[i] = self.bit_hold + self.bit_count -= 1 + return out + + def _apply_echo(self, y: np.ndarray) -> np.ndarray: + buf = self.echo_buf + if buf is None or self.echo_delay <= 0 or self.echo_mix <= 0.0: + return y + out = np.empty_like(y) + idx = self.echo_idx + fb = float(self.echo_feedback) + mix = float(self.echo_mix) + for i, s in enumerate(y): + d = float(buf[idx]) + wet = s + d + buf[idx] = s + d * fb + idx += 1 + if idx >= buf.shape[0]: + idx = 0 + out[i] = (1.0 - mix) * s + mix * wet + self.echo_idx = idx + return out + + def _process_block(self, x: np.ndarray) -> np.ndarray: + y = x.astype(np.float32, copy=False) * self.input_gain + y = self._apply_gate(y) + if self.pitch_semitones != 0.0 and self.sample_rate is not None: + y = _pitch_shift_block(y, sample_rate=self.sample_rate, semitones=self.pitch_semitones) + y = self._apply_ringmod(y) + y = self._apply_bitcrush(y) + y = self._apply_echo(y) + return np.clip(y, -1.0, 1.0) + + +class FunVoiceEffectsModule(Module): + config: FunVoiceEffectsConfig + audio_in: In[AudioStamped] + audio_out: Out[AudioStamped] + + _queue: queue.Queue[AudioStamped] | None = None + _running: threading.Event | None = None + _thread: threading.Thread | None = None + _unsub: Callable[[], None] | None = None + _processor: _FunVoiceProcessor = _FunVoiceProcessor() + _out_frame_size: int | None = None + + async def main(self) -> None: # type: ignore[override] + self._queue = queue.Queue(maxsize=256) + self._running = threading.Event() + self._running.set() + self._thread = threading.Thread(target=self._worker_loop, daemon=True) + self._thread.start() + yield + if self._unsub is not None: + try: + self._unsub() + except Exception: + logger.exception("FunVoiceEffectsModule failed to unsubscribe") + self._unsub = None + if self._running is not None: + self._running.clear() + if self._thread is not None: + self._thread.join(timeout=2.0) + self._thread = None + self._queue = None + self._running = None + + @rpc + def start(self) -> None: + super().start() + self._unsub = self.audio_in.subscribe(self._on_audio) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_audio(self, msg: AudioStamped) -> None: + q = self._queue + if q is None or not self.config.enabled: + if self.config.enabled: + self.audio_out.publish(msg) + return + try: + q.put_nowait(msg) + except queue.Full: + try: + _ = q.get_nowait() + except queue.Empty: + return + try: + q.put_nowait(msg) + except queue.Full: + return + + def _decode_to_float(self, msg: AudioStamped) -> np.ndarray | None: + if msg.coding_format != "pcm": + return None + if msg.sample_format == "S16LE": + x = msg.to_numpy().astype(np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + return x / 32768.0 + if msg.sample_format in ("F32LE", "F32"): + x = msg.to_numpy().astype(np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + return x + return None + + def _encode_from_float(self, y: np.ndarray, sample_rate: int) -> bytes: + y = np.clip(y, -1.0, 1.0) + pcm_i16 = (y * 32767.0).astype(np.int16) + return pcm_i16.tobytes() + + def _worker_loop(self) -> None: + running = self._running + q = self._queue + if running is None or q is None: + return + while running.is_set(): + try: + msg = q.get(timeout=0.25) + except queue.Empty: + continue + x = self._decode_to_float(msg) + if x is None: + continue + if self._processor.sample_rate != msg.sample_rate: + self._processor.reconfigure(self.config, sample_rate=msg.sample_rate) + self._out_frame_size = x.shape[0] + logger.info( + "FunVoiceEffectsModule: ready " + f"(sr={msg.sample_rate}, block_ms={self.config.block_ms}, pitch={self.config.pitch_semitones})" + ) + frame_size = self._out_frame_size or x.shape[0] + self._processor.push(x) + y = self._processor.pop(frame_size) + + now = time.monotonic() + if now - self._processor.last_vu_log >= float(self.config.vu_log_interval_s): + peak = float(np.max(np.abs(y))) if y.shape[0] else 0.0 + rms = float(np.sqrt(np.mean(y * y) + 1e-12)) if y.shape[0] else 0.0 + self._processor.last_vu_log = now + logger.info(f"FunVoiceEffectsModule: vu peak={peak:.3f} rms={rms:.3f}") + + self.audio_out.publish( + AudioStamped.from_pcm( + pcm_bytes=self._encode_from_float(y, sample_rate=msg.sample_rate), + sample_rate=msg.sample_rate, + channels=1, + sample_format="S16LE", + coding_format="pcm", + ts=time.monotonic(), + ) + ) + + audio_speech_loopback = autoconnect( AudioModule.blueprint(), SpeechToTextModule.blueprint(), TextToSpeechModule.blueprint(), + FunVoiceEffectsModule.blueprint(), SpeakerModule.blueprint(), ).remappings( [ @@ -704,7 +1100,9 @@ def _worker_loop(self) -> None: (SpeechToTextModule, "audio", "mic_audio"), (SpeechToTextModule, "text", "speech_text"), (TextToSpeechModule, "text", "speech_text"), - (TextToSpeechModule, "audio", "tts_audio"), + (TextToSpeechModule, "audio", "tts_audio_raw"), + (FunVoiceEffectsModule, "audio_in", "tts_audio_raw"), + (FunVoiceEffectsModule, "audio_out", "tts_audio"), (SpeakerModule, "audio", "tts_audio"), ] ) From 68858a44aa3d7791d28d8928eb7b011baf3b29b0 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Thu, 18 Jun 2026 17:37:14 +0800 Subject: [PATCH 06/13] fix(audio): add OpenAI TTS timeout + fail-fast --- dimos/hardware/sensors/audio/module.py | 620 ++++++++++++++++++++++--- 1 file changed, 552 insertions(+), 68 deletions(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index 569e2f9981..c6ce1fb94a 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -26,12 +26,22 @@ from __future__ import annotations +from collections import deque from collections.abc import Callable import asyncio +from difflib import SequenceMatcher import math +import os import queue +import re +import shutil +import subprocess +import sys +import tempfile import threading import time +from typing import Literal +import wave import numpy as np from pydantic import Field @@ -377,40 +387,80 @@ def _writer_loop(self) -> None: class SpeechToTextConfig(ModuleConfig): + backend_preference: Literal["whisper.cpp", "faster-whisper", "openai-whisper", "auto"] = ( + "whisper.cpp" + ) model: str = "base" language: str = "en" fp16: bool = False segment_seconds: float = 3.0 queue_max_segments: int = 8 + whisper_cpp_command: str | None = Field(default=None) + whisper_cpp_model_path: str | None = Field(default=None) + faster_whisper_download_root: str | None = Field(default=None) + drop_during_tts: bool = True + tts_guard_seconds: float = 0.35 + vad_enabled: bool = True + vad_threshold_db: float = -42.0 + vad_hangover_ms: int = 350 + aec_enabled: bool = True + aec_strength: float = 0.85 + aec_reference_window_s: float = 6.0 + aec_correlation_threshold: float = 0.45 + dedupe_enabled: bool = True + dedupe_window_s: float = 8.0 + self_tts_guard_enabled: bool = True + self_tts_guard_window_s: float = 8.0 + self_tts_similarity_threshold: float = 0.88 class SpeechToTextModule(Module): config: SpeechToTextConfig audio: In[AudioStamped] + tts_active: In[bool] + recent_tts_text: In[str] + tts_reference_audio: In[AudioStamped] text: Out[str] _segment_queue: queue.Queue[tuple[np.ndarray, int]] | None = None _running: threading.Event | None = None _thread: threading.Thread | None = None - _unsub: Callable[[], None] | None = None + _unsubs: list[Callable[[], None]] | None = None _buf: np.ndarray | None = None _buf_rate: int | None = None + _state_lock: threading.Lock = threading.Lock() + _tts_playing: bool = False + _tts_guard_until: float = 0.0 + _vad_open_until: float = 0.0 + _aec_ref_buf: np.ndarray | None = None + _aec_ref_rate: int | None = None + _recent_tts_texts: deque[tuple[float, str]] | None = None + _last_emitted_text: tuple[float, str] | None = None async def main(self) -> None: # type: ignore[override] self._segment_queue = queue.Queue(maxsize=self.config.queue_max_segments) self._running = threading.Event() self._running.set() + self._unsubs = [] self._buf = np.zeros((0,), dtype=np.float32) self._buf_rate = None + self._tts_playing = False + self._tts_guard_until = 0.0 + self._vad_open_until = 0.0 + self._aec_ref_buf = np.zeros((0,), dtype=np.float32) + self._aec_ref_rate = None + self._recent_tts_texts = deque() + self._last_emitted_text = None self._thread = threading.Thread(target=self._worker_loop, daemon=True) self._thread.start() yield - if self._unsub is not None: - try: - self._unsub() - except Exception: - logger.exception("SpeechToTextModule failed to unsubscribe") - self._unsub = None + if self._unsubs is not None: + for unsub in self._unsubs: + try: + unsub() + except Exception: + logger.exception("SpeechToTextModule failed to unsubscribe") + self._unsubs = None if self._running is not None: self._running.clear() if self._thread is not None: @@ -420,34 +470,179 @@ async def main(self) -> None: # type: ignore[override] self._running = None self._buf = None self._buf_rate = None + self._aec_ref_buf = None + self._aec_ref_rate = None + self._recent_tts_texts = None + self._last_emitted_text = None @rpc def start(self) -> None: super().start() - self._unsub = self.audio.subscribe(self._on_audio) + self._unsubs = [ + self.audio.subscribe(self._on_audio), + self.tts_active.subscribe(self._on_tts_active), + self.recent_tts_text.subscribe(self._on_recent_tts_text), + self.tts_reference_audio.subscribe(self._on_tts_reference_audio), + ] @rpc def stop(self) -> None: super().stop() - def _on_audio(self, msg: AudioStamped) -> None: + def _decode_audio(self, msg: AudioStamped) -> np.ndarray | None: if msg.coding_format != "pcm": - return + return None if msg.sample_format == "S16LE": x = msg.to_numpy().astype(np.float32) if x.ndim > 1: x = x.mean(axis=1) - x = x / 32768.0 - elif msg.sample_format in ("F32LE", "F32"): + return x / 32768.0 + if msg.sample_format in ("F32LE", "F32"): x = msg.to_numpy().astype(np.float32) if x.ndim > 1: x = x.mean(axis=1) + return x + return None + + def _reset_live_buffer(self) -> None: + if self._buf is not None: + self._buf = np.zeros((0,), dtype=np.float32) + + def _on_tts_active(self, active: bool) -> None: + with self._state_lock: + now = time.monotonic() + self._tts_playing = bool(active) + if active: + self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) + else: + self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) + if active: + self._reset_live_buffer() + + def _on_recent_tts_text(self, text: str) -> None: + normalized = _normalize_text(text) + if not normalized: + return + with self._state_lock: + dq = self._recent_tts_texts + if dq is None: + return + now = time.monotonic() + dq.append((now, normalized)) + self._trim_recent_tts_texts_locked(now) + + def _on_tts_reference_audio(self, msg: AudioStamped) -> None: + if not self.config.aec_enabled: + return + x = self._decode_audio(msg) + if x is None or x.shape[0] == 0: + return + with self._state_lock: + if self._aec_ref_buf is None: + return + if self._aec_ref_rate is None or self._aec_ref_rate != msg.sample_rate: + self._aec_ref_rate = msg.sample_rate + self._aec_ref_buf = np.zeros((0,), dtype=np.float32) + ref = np.concatenate([self._aec_ref_buf, x], axis=0) + max_samples = max(1, int(self.config.aec_reference_window_s * msg.sample_rate)) + if ref.shape[0] > max_samples: + ref = ref[-max_samples:] + self._aec_ref_buf = ref + + def _trim_recent_tts_texts_locked(self, now: float) -> None: + dq = self._recent_tts_texts + if dq is None: + return + window = float(self.config.self_tts_guard_window_s) + while dq and now - dq[0][0] > window: + dq.popleft() + + def _is_tts_guard_active(self) -> bool: + with self._state_lock: + now = time.monotonic() + return self._tts_playing or now < self._tts_guard_until + + def _passes_vad(self, x: np.ndarray) -> bool: + if not self.config.vad_enabled: + return True + rms = float(np.sqrt(np.mean(x * x) + 1e-12)) + rms_db = 20.0 * math.log10(max(rms, 1e-6)) + now = time.monotonic() + if rms_db >= float(self.config.vad_threshold_db): + with self._state_lock: + self._vad_open_until = now + float(self.config.vad_hangover_ms) / 1000.0 + return True + with self._state_lock: + return now < self._vad_open_until + + def _apply_aec(self, x: np.ndarray, sample_rate: int) -> np.ndarray: + if not self.config.aec_enabled: + return x + with self._state_lock: + ref = None if self._aec_ref_buf is None else self._aec_ref_buf.copy() + ref_rate = self._aec_ref_rate + if ref is None or ref_rate is None or ref.shape[0] == 0: + return x + target_len = x.shape[0] + if ref_rate == sample_rate: + ref_tail = ref[-target_len:] if ref.shape[0] >= target_len else np.pad(ref, (target_len - ref.shape[0], 0)) else: + desired_ref_len = max(1, int(round(target_len * ref_rate / sample_rate))) + ref_slice = ref[-desired_ref_len:] if ref.shape[0] >= desired_ref_len else ref + ref_tail = _resample_linear(ref_slice, target_len) + ref_rms = float(np.sqrt(np.mean(ref_tail * ref_tail) + 1e-12)) + if ref_rms < 1e-3: + return x + x_centered = x - np.mean(x) + ref_centered = ref_tail - np.mean(ref_tail) + denom = float(np.linalg.norm(x_centered) * np.linalg.norm(ref_centered) + 1e-9) + corr = float(np.dot(x_centered, ref_centered) / denom) + if corr < float(self.config.aec_correlation_threshold): + return x + return x - float(self.config.aec_strength) * ref_tail + + def _should_drop_as_recent_tts(self, normalized_text: str) -> bool: + if not self.config.self_tts_guard_enabled or not normalized_text: + return False + now = time.monotonic() + with self._state_lock: + self._trim_recent_tts_texts_locked(now) + recent = [] if self._recent_tts_texts is None else list(self._recent_tts_texts) + threshold = float(self.config.self_tts_similarity_threshold) + for _ts, recent_text in recent: + if normalized_text == recent_text: + return True + similarity = SequenceMatcher(a=normalized_text, b=recent_text).ratio() + if similarity >= threshold: + return True + return False + + def _is_consecutive_duplicate(self, normalized_text: str) -> bool: + if not self.config.dedupe_enabled or not normalized_text: + return False + last = self._last_emitted_text + if last is None: + return False + ts, previous = last + return previous == normalized_text and (time.monotonic() - ts) <= float(self.config.dedupe_window_s) + + def _on_audio(self, msg: AudioStamped) -> None: + x = self._decode_audio(msg) + if x is None: return if self._buf is None: return + if self.config.drop_during_tts and self._is_tts_guard_active(): + self._reset_live_buffer() + return + + x = self._apply_aec(x, msg.sample_rate) + if not self._passes_vad(x): + self._reset_live_buffer() + return + if self._buf_rate is None: self._buf_rate = msg.sample_rate elif self._buf_rate != msg.sample_rate: @@ -485,32 +680,70 @@ def _worker_loop(self) -> None: return model: object | None = None - use_faster = False - try: - import whisper # type: ignore[import-untyped] + backend: Literal["whisper.cpp", "faster-whisper", "openai-whisper"] | None = None + backend_error_messages: list[str] = [] - model = whisper.load_model(self.config.model) - use_faster = False - logger.info(f"SpeechToTextModule: ready (backend=openai-whisper, model={self.config.model})") - except Exception: - try: - from faster_whisper import WhisperModel # type: ignore[import-untyped] + for candidate in self._backend_candidates(): + if candidate == "whisper.cpp": + try: + command, model_path = self._resolve_whisper_cpp_backend() + model = {"command": command, "model_path": model_path} + backend = "whisper.cpp" + logger.info( + "SpeechToTextModule: ready " + f"(backend=whisper.cpp, command={command}, model_path={model_path})" + ) + break + except Exception as e: + backend_error_messages.append(f"whisper.cpp: {e}") + continue - compute_type = "float16" if self.config.fp16 else "int8" - model = WhisperModel(self.config.model, device="auto", compute_type=compute_type) - use_faster = True - logger.info( - "SpeechToTextModule: ready " - f"(backend=faster-whisper, model={self.config.model}, fp16={self.config.fp16})" - ) - except Exception as e: - logger.warning(f"SpeechToTextModule: no whisper backend available ({e}); dropping audio") - while running.is_set(): - try: - _ = q.get(timeout=0.25) - except queue.Empty: - continue - return + if candidate == "faster-whisper": + try: + from faster_whisper import WhisperModel # type: ignore[import-untyped] + + compute_type = "float16" if self.config.fp16 else "int8" + model = WhisperModel( + self.config.model, + device="auto", + compute_type=compute_type, + download_root=self.config.faster_whisper_download_root, + ) + backend = "faster-whisper" + logger.info( + "SpeechToTextModule: ready " + f"(backend=faster-whisper, model={self.config.model}, fp16={self.config.fp16})" + ) + break + except Exception as e: + backend_error_messages.append(f"faster-whisper: {e}") + continue + + if candidate == "openai-whisper": + try: + import whisper # type: ignore[import-untyped] + + model = whisper.load_model(self.config.model) + backend = "openai-whisper" + logger.info( + f"SpeechToTextModule: ready (backend=openai-whisper, model={self.config.model})" + ) + break + except Exception as e: + backend_error_messages.append(f"openai-whisper: {e}") + continue + + if backend is None or model is None: + logger.warning( + "SpeechToTextModule: no whisper backend available " + f"({'; '.join(backend_error_messages)}); dropping audio" + ) + while running.is_set(): + try: + _ = q.get(timeout=0.25) + except queue.Empty: + continue + return while running.is_set(): try: @@ -519,7 +752,14 @@ def _worker_loop(self) -> None: continue try: - if use_faster: + if backend == "whisper.cpp": + text = self._transcribe_with_whisper_cpp( + segment=segment, + sample_rate=_rate, + command=str(model["command"]), # type: ignore[index] + model_path=str(model["model_path"]), # type: ignore[index] + ) + elif backend == "faster-whisper": segments, _info = model.transcribe(segment, language=self.config.language) # type: ignore[union-attr] text = " ".join(seg.text.strip() for seg in segments).strip() else: @@ -534,25 +774,139 @@ def _worker_loop(self) -> None: continue if text: + normalized = _normalize_text(text) + if not normalized: + continue + if self._is_consecutive_duplicate(normalized): + logger.info(f"SpeechToTextModule: dropped duplicate text: {normalized}") + continue + if self._should_drop_as_recent_tts(normalized): + logger.info(f"SpeechToTextModule: dropped self-echo text: {normalized}") + continue preview = text if len(text) <= 120 else (text[:120] + "…") logger.info(f"SpeechToTextModule: transcribed: {preview}") + self._last_emitted_text = (time.monotonic(), normalized) self.text.publish(text) + def _backend_candidates( + self, + ) -> list[Literal["whisper.cpp", "faster-whisper", "openai-whisper"]]: + if self.config.backend_preference == "auto": + return ["whisper.cpp", "faster-whisper", "openai-whisper"] + + ordered: list[Literal["whisper.cpp", "faster-whisper", "openai-whisper"]] = [ + self.config.backend_preference, # type: ignore[list-item] + "whisper.cpp", + "faster-whisper", + "openai-whisper", + ] + deduped: list[Literal["whisper.cpp", "faster-whisper", "openai-whisper"]] = [] + for item in ordered: + if item not in deduped: + deduped.append(item) + return deduped + + def _resolve_whisper_cpp_backend(self) -> tuple[str, str]: + command = self.config.whisper_cpp_command + if command: + resolved_command = shutil.which(command) or command + else: + resolved_command = "" + for candidate in ("whisper-cli", "main"): + found = shutil.which(candidate) + if found is not None: + resolved_command = found + break + if not resolved_command: + raise RuntimeError("whisper.cpp command not found (tried whisper-cli/main)") + + model_path = self.config.whisper_cpp_model_path + if model_path is None and self.config.model.endswith(".bin"): + model_path = self.config.model + if model_path is None: + raise RuntimeError( + "whisper.cpp model path not configured; set speechtotextmodule.whisper_cpp_model_path" + ) + if not os.path.exists(model_path): + raise RuntimeError(f"whisper.cpp model not found: {model_path}") + return resolved_command, model_path + + def _transcribe_with_whisper_cpp( + self, + *, + segment: np.ndarray, + sample_rate: int, + command: str, + model_path: str, + ) -> str: + pcm = np.clip(segment, -1.0, 1.0) + pcm_i16 = (pcm * 32767.0).astype(np.int16) + wav_path = "" + out_prefix = "" + try: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file: + wav_path = wav_file.name + with wave.open(wav_path, "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(sample_rate) + wf.writeframes(pcm_i16.tobytes()) + + out_prefix = wav_path[:-4] + cmd = [ + command, + "-m", + model_path, + "-f", + wav_path, + "-l", + self.config.language, + "-otxt", + "-of", + out_prefix, + "-np", + "-nt", + ] + subprocess.run(cmd, check=True, capture_output=True, text=True) + + txt_path = out_prefix + ".txt" + if os.path.exists(txt_path): + with open(txt_path, encoding="utf-8") as f: + return f.read().strip() + return "" + finally: + for path in (wav_path, out_prefix + ".txt" if out_prefix else ""): + if path and os.path.exists(path): + try: + os.unlink(path) + except Exception: + pass + class TextToSpeechConfig(ModuleConfig): + provider: Literal["openai", "macos-say", "pyttsx3"] = "openai" voice: str = "echo" model: str = "tts-1" speed: float = 1.0 sample_rate: int = 24000 frame_ms: int = 20 api_key: str | None = Field(default=None) + request_timeout_s: float = 15.0 + max_retries: int = 0 queue_max_texts: int = 64 + say_voice: str | None = Field(default=None) + say_rate_wpm: int | None = Field(default=None) + pyttsx3_rate_wpm: int = 200 + pyttsx3_volume: float = 1.0 class TextToSpeechModule(Module): config: TextToSpeechConfig text: In[str] audio: Out[AudioStamped] + tts_active: Out[bool] + spoken_text: Out[str] + tts_reference_audio: Out[AudioStamped] _queue: queue.Queue[str] | None = None _running: threading.Event | None = None @@ -613,27 +967,39 @@ def _worker_loop(self) -> None: if running is None or q is None: return - try: - import io - - from openai import OpenAI - import soundfile as sf # type: ignore[import-untyped] - except Exception as e: - logger.warning(f"TextToSpeechModule: missing dependencies ({e}); dropping text") - while running.is_set(): - try: - _ = q.get(timeout=0.25) - except queue.Empty: - continue - return - - client = OpenAI(api_key=self.config.api_key) + provider = self.config.provider frame_size = max(1, int(self.config.sample_rate * self.config.frame_ms / 1000)) logger.info( "TextToSpeechModule: ready " - f"(model={self.config.model}, voice={self.config.voice}, sample_rate={self.config.sample_rate})" + f"(provider={provider}, sample_rate={self.config.sample_rate}, frame_ms={self.config.frame_ms})" ) + openai_client = None + sf = None + io = None + + if provider == "openai": + try: + import io as _io + + from openai import OpenAI + import soundfile as _sf # type: ignore[import-untyped] + except Exception as e: + logger.warning(f"TextToSpeechModule: missing OpenAI dependencies ({e}); dropping text") + while running.is_set(): + try: + _ = q.get(timeout=0.25) + except queue.Empty: + continue + return + openai_client = OpenAI( + api_key=self.config.api_key, + timeout=float(self.config.request_timeout_s), + max_retries=int(self.config.max_retries), + ) + sf = _sf + io = _io + while running.is_set(): try: text = q.get(timeout=0.25) @@ -642,15 +1008,118 @@ def _worker_loop(self) -> None: preview = text if len(text) <= 120 else (text[:120] + "…") logger.info(f"TextToSpeechModule: synthesizing: {preview}") + + if provider == "pyttsx3": + self.tts_active.publish(True) + try: + import pyttsx3 # type: ignore[import-not-found] + except Exception as e: + logger.warning(f"TextToSpeechModule: missing pyttsx3 ({e}); dropping text") + self.tts_active.publish(False) + continue + try: + engine = pyttsx3.init() + engine.setProperty("rate", self.config.pyttsx3_rate_wpm) + engine.setProperty("volume", self.config.pyttsx3_volume) + engine.say(text) + engine.runAndWait() + self.spoken_text.publish(text) + except Exception: + logger.exception("TextToSpeechModule pyttsx3 synthesis failed") + self.tts_active.publish(False) + continue + + if provider == "macos-say": + import os + import subprocess + import tempfile + import wave + + if os.name != "posix" or not sys.platform.startswith("darwin"): + logger.warning("TextToSpeechModule: macos-say provider requires macOS; dropping text") + self.tts_active.publish(False) + continue + tmp_path = None + try: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + tmp_path = tmp.name + cmd: list[str] = [ + "say", + "-o", + tmp_path, + "--file-format=WAVE", + f"--data-format=LEI16@{self.config.sample_rate}", + "--channels=1", + ] + if self.config.say_voice: + cmd.extend(["-v", self.config.say_voice]) + if self.config.say_rate_wpm is not None: + cmd.extend(["-r", str(self.config.say_rate_wpm)]) + cmd.append(text) + subprocess.run(cmd, check=True, capture_output=True, text=True) + with wave.open(tmp_path, "rb") as wf: + channels = int(wf.getnchannels()) + sampwidth = int(wf.getsampwidth()) + rate = int(wf.getframerate()) + pcm_bytes = wf.readframes(wf.getnframes()) + if channels != 1 or sampwidth != 2: + logger.warning( + "TextToSpeechModule: macos-say produced unexpected format " + f"(channels={channels}, sampwidth={sampwidth}); dropping audio" + ) + continue + if rate != self.config.sample_rate: + x = np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32) / 32767.0 + new_n = int(x.shape[0] * self.config.sample_rate / rate) + x = _resample_linear(x, new_n) + pcm_i16 = np.clip(x, -1.0, 1.0) + pcm_i16 = (pcm_i16 * 32767.0).astype(np.int16) + pcm_bytes = pcm_i16.tobytes() + + self.spoken_text.publish(text) + n_bytes_per_sample = 2 + chunk_bytes = frame_size * n_bytes_per_sample + self.tts_active.publish(True) + for offset in range(0, len(pcm_bytes), chunk_bytes): + chunk = pcm_bytes[offset : offset + chunk_bytes] + msg_out = AudioStamped.from_pcm( + pcm_bytes=chunk, + sample_rate=self.config.sample_rate, + channels=1, + sample_format="S16LE", + coding_format="pcm", + ts=time.monotonic(), + ) + self.tts_reference_audio.publish(msg_out) + self.audio.publish(msg_out) + logger.info( + "TextToSpeechModule: published audio " + f"({len(pcm_bytes) / (n_bytes_per_sample * self.config.sample_rate):.2f}s)" + ) + except Exception: + logger.exception("TextToSpeechModule macos-say synthesis failed") + finally: + self.tts_active.publish(False) + if tmp_path is not None: + try: + os.unlink(tmp_path) + except Exception: + pass + continue + + if provider != "openai": + logger.warning(f"TextToSpeechModule: unknown provider '{provider}'; dropping text") + continue + try: - response = client.audio.speech.create( + response = openai_client.audio.speech.create( # type: ignore[union-attr] model=self.config.model, voice=self.config.voice, input=text, speed=self.config.speed, ) - audio_data = io.BytesIO(response.content) - with sf.SoundFile(audio_data, "r") as sound_file: + audio_data = io.BytesIO(response.content) # type: ignore[union-attr] + with sf.SoundFile(audio_data, "r") as sound_file: # type: ignore[union-attr] src_rate = int(sound_file.samplerate) samples = sound_file.read(dtype="float32") except Exception: @@ -674,12 +1143,14 @@ def _worker_loop(self) -> None: pcm_i16 = (pcm * 32767.0).astype(np.int16) pcm_bytes = pcm_i16.tobytes() - n_bytes_per_sample = 2 - chunk_bytes = frame_size * n_bytes_per_sample - for offset in range(0, len(pcm_bytes), chunk_bytes): - chunk = pcm_bytes[offset : offset + chunk_bytes] - self.audio.publish( - AudioStamped.from_pcm( + self.tts_active.publish(True) + try: + self.spoken_text.publish(text) + n_bytes_per_sample = 2 + chunk_bytes = frame_size * n_bytes_per_sample + for offset in range(0, len(pcm_bytes), chunk_bytes): + chunk = pcm_bytes[offset : offset + chunk_bytes] + msg_out = AudioStamped.from_pcm( pcm_bytes=chunk, sample_rate=self.config.sample_rate, channels=1, @@ -687,11 +1158,14 @@ def _worker_loop(self) -> None: coding_format="pcm", ts=time.monotonic(), ) + self.tts_reference_audio.publish(msg_out) + self.audio.publish(msg_out) + logger.info( + "TextToSpeechModule: published audio " + f"({len(pcm_bytes) / (n_bytes_per_sample * self.config.sample_rate):.2f}s)" ) - logger.info( - "TextToSpeechModule: published audio " - f"({len(pcm_bytes) / (n_bytes_per_sample * self.config.sample_rate):.2f}s)" - ) + finally: + self.tts_active.publish(False) def _phase_vocoder(D: np.ndarray, rate: float, hop_length: int, n_fft: int) -> np.ndarray: @@ -759,6 +1233,11 @@ def _resample_linear(y: np.ndarray, n: int) -> np.ndarray: return np.interp(x, xp, y).astype(np.float32) +def _normalize_text(text: str) -> str: + cleaned = re.sub(r"[^\w\s]", " ", text.lower()) + return re.sub(r"\s+", " ", cleaned).strip() + + def _pitch_shift_block(y: np.ndarray, sample_rate: int, semitones: float) -> np.ndarray: if semitones == 0.0: return y @@ -1010,8 +1489,7 @@ def stop(self) -> None: def _on_audio(self, msg: AudioStamped) -> None: q = self._queue if q is None or not self.config.enabled: - if self.config.enabled: - self.audio_out.publish(msg) + self.audio_out.publish(msg) return try: q.put_nowait(msg) @@ -1098,6 +1576,12 @@ def _worker_loop(self) -> None: [ (AudioModule, "audio", "mic_audio"), (SpeechToTextModule, "audio", "mic_audio"), + (TextToSpeechModule, "tts_active", "tts_active_signal"), + (SpeechToTextModule, "tts_active", "tts_active_signal"), + (TextToSpeechModule, "spoken_text", "recent_tts_text"), + (SpeechToTextModule, "recent_tts_text", "recent_tts_text"), + (TextToSpeechModule, "tts_reference_audio", "tts_reference_audio"), + (SpeechToTextModule, "tts_reference_audio", "tts_reference_audio"), (SpeechToTextModule, "text", "speech_text"), (TextToSpeechModule, "text", "speech_text"), (TextToSpeechModule, "audio", "tts_audio_raw"), From 49391ee70d3b630084514966ca0d1ea81ce30af3 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Sat, 20 Jun 2026 20:30:34 +0800 Subject: [PATCH 07/13] fix: reduce audio loopback echo and STT backlog --- dimos/hardware/sensors/audio/module.py | 158 +++++++++++++++++++------ 1 file changed, 125 insertions(+), 33 deletions(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index c6ce1fb94a..191efeeacb 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -223,11 +223,13 @@ def on_frame(msg: AudioStamped) -> None: class SpeakerConfig(ModuleConfig): device: int | None = Field(default=None) queue_max_chunks: int = 256 + playing_idle_ms: int = 300 class SpeakerModule(Module): config: SpeakerConfig audio: In[AudioStamped] + speaker_playing: Out[bool] _queue: queue.Queue[AudioStamped] | None = None _running: threading.Event | None = None @@ -243,6 +245,7 @@ async def main(self) -> None: # type: ignore[override] self._queue = queue.Queue(maxsize=self.config.queue_max_chunks) self._running = threading.Event() self._running.set() + self.speaker_playing.publish(False) self._writer_thread = threading.Thread(target=self._writer_loop, daemon=True) self._writer_thread.start() yield @@ -273,6 +276,7 @@ async def main(self) -> None: # type: ignore[override] self._stream_rate = None self._stream_channels = None self._stream_dtype = None + self.speaker_playing.publish(False) @rpc def start(self) -> None: @@ -304,6 +308,9 @@ def _writer_loop(self) -> None: q = self._queue if running is None or q is None: return + is_playing = False + last_write_at = 0.0 + idle_timeout_s = max(0.0, float(self.config.playing_idle_ms) / 1000.0) try: import sounddevice as sd # type: ignore[import-untyped] @@ -320,6 +327,9 @@ def _writer_loop(self) -> None: try: msg = q.get(timeout=0.25) except queue.Empty: + if is_playing and (time.monotonic() - last_write_at) >= idle_timeout_s: + self.speaker_playing.publish(False) + is_playing = False continue if msg.coding_format != "pcm": @@ -382,8 +392,14 @@ def _writer_loop(self) -> None: if self._stream is not None: try: self._stream.write(data) + last_write_at = time.monotonic() + if not is_playing: + self.speaker_playing.publish(True) + is_playing = True except Exception: logger.exception("SpeakerModule failed to write audio") + if is_playing: + self.speaker_playing.publish(False) class SpeechToTextConfig(ModuleConfig): @@ -394,30 +410,34 @@ class SpeechToTextConfig(ModuleConfig): language: str = "en" fp16: bool = False segment_seconds: float = 3.0 + flush_buffer_after_segment: bool = True queue_max_segments: int = 8 whisper_cpp_command: str | None = Field(default=None) whisper_cpp_model_path: str | None = Field(default=None) faster_whisper_download_root: str | None = Field(default=None) drop_during_tts: bool = True - tts_guard_seconds: float = 0.35 + tts_guard_seconds: float = 0.8 vad_enabled: bool = True vad_threshold_db: float = -42.0 vad_hangover_ms: int = 350 + vad_flush_on_silence: bool = True + vad_flush_min_seconds: float = 0.8 aec_enabled: bool = True - aec_strength: float = 0.85 - aec_reference_window_s: float = 6.0 - aec_correlation_threshold: float = 0.45 + aec_strength: float = 0.92 + aec_reference_window_s: float = 8.0 + aec_correlation_threshold: float = 0.30 dedupe_enabled: bool = True - dedupe_window_s: float = 8.0 + dedupe_window_s: float = 15.0 self_tts_guard_enabled: bool = True - self_tts_guard_window_s: float = 8.0 - self_tts_similarity_threshold: float = 0.88 + self_tts_guard_window_s: float = 15.0 + self_tts_similarity_threshold: float = 0.82 class SpeechToTextModule(Module): config: SpeechToTextConfig audio: In[AudioStamped] tts_active: In[bool] + speaker_playing: In[bool] recent_tts_text: In[str] tts_reference_audio: In[AudioStamped] text: Out[str] @@ -429,7 +449,8 @@ class SpeechToTextModule(Module): _buf: np.ndarray | None = None _buf_rate: int | None = None _state_lock: threading.Lock = threading.Lock() - _tts_playing: bool = False + _tts_active_signal: bool = False + _speaker_playing_signal: bool = False _tts_guard_until: float = 0.0 _vad_open_until: float = 0.0 _aec_ref_buf: np.ndarray | None = None @@ -444,7 +465,8 @@ async def main(self) -> None: # type: ignore[override] self._unsubs = [] self._buf = np.zeros((0,), dtype=np.float32) self._buf_rate = None - self._tts_playing = False + self._tts_active_signal = False + self._speaker_playing_signal = False self._tts_guard_until = 0.0 self._vad_open_until = 0.0 self._aec_ref_buf = np.zeros((0,), dtype=np.float32) @@ -481,6 +503,7 @@ def start(self) -> None: self._unsubs = [ self.audio.subscribe(self._on_audio), self.tts_active.subscribe(self._on_tts_active), + self.speaker_playing.subscribe(self._on_speaker_playing), self.recent_tts_text.subscribe(self._on_recent_tts_text), self.tts_reference_audio.subscribe(self._on_tts_reference_audio), ] @@ -508,16 +531,35 @@ def _reset_live_buffer(self) -> None: if self._buf is not None: self._buf = np.zeros((0,), dtype=np.float32) + def _clear_segment_queue(self) -> None: + q = self._segment_queue + if q is None: + return + while True: + try: + _ = q.get_nowait() + except queue.Empty: + break + + def _drop_live_audio_state(self) -> None: + self._reset_live_buffer() + self._clear_segment_queue() + with self._state_lock: + self._vad_open_until = 0.0 + def _on_tts_active(self, active: bool) -> None: with self._state_lock: now = time.monotonic() - self._tts_playing = bool(active) - if active: - self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) - else: - self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) - if active: - self._reset_live_buffer() + self._tts_active_signal = bool(active) + self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) + self._drop_live_audio_state() + + def _on_speaker_playing(self, active: bool) -> None: + with self._state_lock: + now = time.monotonic() + self._speaker_playing_signal = bool(active) + self._tts_guard_until = max(self._tts_guard_until, now + self.config.tts_guard_seconds) + self._drop_live_audio_state() def _on_recent_tts_text(self, text: str) -> None: normalized = _normalize_text(text) @@ -560,7 +602,7 @@ def _trim_recent_tts_texts_locked(self, now: float) -> None: def _is_tts_guard_active(self) -> bool: with self._state_lock: now = time.monotonic() - return self._tts_playing or now < self._tts_guard_until + return self._tts_active_signal or self._speaker_playing_signal or now < self._tts_guard_until def _passes_vad(self, x: np.ndarray) -> bool: if not self.config.vad_enabled: @@ -626,6 +668,32 @@ def _is_consecutive_duplicate(self, normalized_text: str) -> bool: ts, previous = last return previous == normalized_text and (time.monotonic() - ts) <= float(self.config.dedupe_window_s) + def _enqueue_segment(self, segment: np.ndarray, rate: int) -> bool: + q = self._segment_queue + if q is None: + return False + try: + q.put_nowait((segment, rate)) + return True + except queue.Full: + try: + _ = q.get_nowait() + except queue.Empty: + return False + try: + q.put_nowait((segment, rate)) + return True + except queue.Full: + return False + + def _flush_partial_segment_on_silence(self) -> bool: + if not self.config.vad_flush_on_silence or self._buf is None or self._buf_rate is None: + return False + min_samples = max(1, int(float(self.config.vad_flush_min_seconds) * self._buf_rate)) + if self._buf.shape[0] < min_samples: + return False + return self._enqueue_segment(self._buf.copy(), self._buf_rate) + def _on_audio(self, msg: AudioStamped) -> None: x = self._decode_audio(msg) if x is None: @@ -633,13 +701,14 @@ def _on_audio(self, msg: AudioStamped) -> None: if self._buf is None: return - + # Where STT mutes if self.config.drop_during_tts and self._is_tts_guard_active(): self._reset_live_buffer() return x = self._apply_aec(x, msg.sample_rate) if not self._passes_vad(x): + _ = self._flush_partial_segment_on_silence() self._reset_live_buffer() return @@ -655,23 +724,22 @@ def _on_audio(self, msg: AudioStamped) -> None: return target_samples = max(1, int(self.config.segment_seconds * rate)) + # Prefer waiting for VAD to close the utterance. The fixed duration acts + # as a safety cap for uninterrupted speech so the live buffer cannot grow + # forever if silence never arrives. + if self._buf.shape[0] < target_samples: + return + if self.config.flush_buffer_after_segment: + if self._enqueue_segment(self._buf.copy(), rate): + self._buf = np.zeros((0,), dtype=np.float32) + return + while self._buf.shape[0] >= target_samples: segment = self._buf[:target_samples] - self._buf = self._buf[target_samples:] - q = self._segment_queue - if q is None: - return - try: - q.put_nowait((segment, rate)) - except queue.Full: - try: - _ = q.get_nowait() - except queue.Empty: - break - try: - q.put_nowait((segment, rate)) - except queue.Full: - break + remaining = self._buf[target_samples:] + if not self._enqueue_segment(segment, rate): + break + self._buf = remaining def _worker_loop(self) -> None: running = self._running @@ -777,6 +845,9 @@ def _worker_loop(self) -> None: normalized = _normalize_text(text) if not normalized: continue + if _is_bad_transcript(text=text, normalized_text=normalized): + logger.info(f"SpeechToTextModule: dropped bad transcript: {normalized}") + continue if self._is_consecutive_duplicate(normalized): logger.info(f"SpeechToTextModule: dropped duplicate text: {normalized}") continue @@ -1238,6 +1309,25 @@ def _normalize_text(text: str) -> str: return re.sub(r"\s+", " ", cleaned).strip() +def _is_bad_transcript(*, text: str, normalized_text: str) -> bool: + stripped = text.strip() + if not stripped or not normalized_text: + return True + + # Whisper-style non-speech captions should not be spoken back into the loop. + if (stripped.startswith("[") and stripped.endswith("]")) or ( + stripped.startswith("(") and stripped.endswith(")") + ): + return True + + return normalized_text in { + "blank_audio", + "music_playing", + "applause", + "laughter", + } + + def _pitch_shift_block(y: np.ndarray, sample_rate: int, semitones: float) -> np.ndarray: if semitones == 0.0: return y @@ -1578,6 +1668,8 @@ def _worker_loop(self) -> None: (SpeechToTextModule, "audio", "mic_audio"), (TextToSpeechModule, "tts_active", "tts_active_signal"), (SpeechToTextModule, "tts_active", "tts_active_signal"), + (SpeakerModule, "speaker_playing", "speaker_playing_signal"), + (SpeechToTextModule, "speaker_playing", "speaker_playing_signal"), (TextToSpeechModule, "spoken_text", "recent_tts_text"), (SpeechToTextModule, "recent_tts_text", "recent_tts_text"), (TextToSpeechModule, "tts_reference_audio", "tts_reference_audio"), From f36bb770594eb75f44bb77f5b468226a0e9cc51f Mon Sep 17 00:00:00 2001 From: GuoZi Date: Sat, 20 Jun 2026 20:50:58 +0800 Subject: [PATCH 08/13] fix: use wall-clock audio timestamps and clean subscriptions --- dimos/hardware/sensors/audio/module.py | 55 +++++++++++++++----------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index 191efeeacb..aec22e8275 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -57,6 +57,22 @@ logger = setup_logger() +def _unsubscribe_safely(unsub: Callable[[], None] | None, *, label: str) -> None: + if unsub is None: + return + try: + unsub() + except Exception: + logger.exception(f"{label} failed to unsubscribe") + + +def _unsubscribe_many_safely(unsubs: list[Callable[[], None]] | None, *, label: str) -> None: + if unsubs is None: + return + for unsub in unsubs: + _unsubscribe_safely(unsub, label=label) + + class AudioConfig(ModuleConfig): sample_rate: int = 16000 channels: int = 1 @@ -155,7 +171,7 @@ def _sd_callback( channels=self.config.channels, sample_format=self.config.sample_format, coding_format="pcm", - ts=time.monotonic(), + ts=time.time(), ) self.audio.publish(msg) @@ -184,7 +200,7 @@ async def _synth_loop(self, frame_size: int) -> None: channels=self.config.channels, sample_format=self.config.sample_format, coding_format="pcm", - ts=time.monotonic(), + ts=time.time(), ) self.audio.publish(msg) @@ -250,10 +266,7 @@ async def main(self) -> None: # type: ignore[override] self._writer_thread.start() yield if self._unsub is not None: - try: - self._unsub() - except Exception: - logger.exception("SpeakerModule failed to unsubscribe") + _unsubscribe_safely(self._unsub, label="SpeakerModule") self._unsub = None if self._running is not None: self._running.clear() @@ -280,6 +293,8 @@ async def main(self) -> None: # type: ignore[override] @rpc def start(self) -> None: + _unsubscribe_safely(self._unsub, label="SpeakerModule") + self._unsub = None super().start() self._unsub = self.audio.subscribe(self._on_audio) @@ -477,11 +492,7 @@ async def main(self) -> None: # type: ignore[override] self._thread.start() yield if self._unsubs is not None: - for unsub in self._unsubs: - try: - unsub() - except Exception: - logger.exception("SpeechToTextModule failed to unsubscribe") + _unsubscribe_many_safely(self._unsubs, label="SpeechToTextModule") self._unsubs = None if self._running is not None: self._running.clear() @@ -499,6 +510,8 @@ async def main(self) -> None: # type: ignore[override] @rpc def start(self) -> None: + _unsubscribe_many_safely(self._unsubs, label="SpeechToTextModule") + self._unsubs = None super().start() self._unsubs = [ self.audio.subscribe(self._on_audio), @@ -992,10 +1005,7 @@ async def main(self) -> None: # type: ignore[override] self._thread.start() yield if self._unsub is not None: - try: - self._unsub() - except Exception: - logger.exception("TextToSpeechModule failed to unsubscribe") + _unsubscribe_safely(self._unsub, label="TextToSpeechModule") self._unsub = None if self._running is not None: self._running.clear() @@ -1007,6 +1017,8 @@ async def main(self) -> None: # type: ignore[override] @rpc def start(self) -> None: + _unsubscribe_safely(self._unsub, label="TextToSpeechModule") + self._unsub = None super().start() self._unsub = self.text.subscribe(self._on_text) @@ -1159,7 +1171,7 @@ def _worker_loop(self) -> None: channels=1, sample_format="S16LE", coding_format="pcm", - ts=time.monotonic(), + ts=time.time(), ) self.tts_reference_audio.publish(msg_out) self.audio.publish(msg_out) @@ -1227,7 +1239,7 @@ def _worker_loop(self) -> None: channels=1, sample_format="S16LE", coding_format="pcm", - ts=time.monotonic(), + ts=time.time(), ) self.tts_reference_audio.publish(msg_out) self.audio.publish(msg_out) @@ -1554,10 +1566,7 @@ async def main(self) -> None: # type: ignore[override] self._thread.start() yield if self._unsub is not None: - try: - self._unsub() - except Exception: - logger.exception("FunVoiceEffectsModule failed to unsubscribe") + _unsubscribe_safely(self._unsub, label="FunVoiceEffectsModule") self._unsub = None if self._running is not None: self._running.clear() @@ -1569,6 +1578,8 @@ async def main(self) -> None: # type: ignore[override] @rpc def start(self) -> None: + _unsubscribe_safely(self._unsub, label="FunVoiceEffectsModule") + self._unsub = None super().start() self._unsub = self.audio_in.subscribe(self._on_audio) @@ -1651,7 +1662,7 @@ def _worker_loop(self) -> None: channels=1, sample_format="S16LE", coding_format="pcm", - ts=time.monotonic(), + ts=time.time(), ) ) From 63b81ee0e069da8c418e70e4112c019ee8d003e1 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Mon, 22 Jun 2026 10:32:51 +0800 Subject: [PATCH 09/13] docs: add audio subsystem handoff --- dimos/hardware/sensors/audio/README.md | 154 +++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 dimos/hardware/sensors/audio/README.md diff --git a/dimos/hardware/sensors/audio/README.md b/dimos/hardware/sensors/audio/README.md new file mode 100644 index 0000000000..00c3c8f220 --- /dev/null +++ b/dimos/hardware/sensors/audio/README.md @@ -0,0 +1,154 @@ +# Audio Subsystem -- Handoff + +**Owner (trial):** Zhuoran Guo · **Last updated:** 2026-06-22 · **PR #2507** · **Issue #1932** +**Location:** `dimos/hardware/sensors/audio/` (peer to `camera/`, `lidar/`) + +--- + +## 1. What this is + +The voice I/O path for dimos: **capture -> STT -> (agent) -> TTS -> effects -> speaker.** +It supplies the missing *speech-input* half of dimos's "command a robot in natural +language" story. Today it runs as a self-contained voice loopback; it is not yet wired +to the agent or to `memory2` (see section 5-6). + +All five modules and both blueprints live in a single file: +`dimos/hardware/sensors/audio/module.py`. + +--- + +## 2. Modules At A Glance + +| Module | Role | Status | Verified | +|---|---|---|---| +| `AudioModule` | Mic capture -> `AudioStamped` (one msg per `frame_ms` chunk). Real path via `sounddevice`/PortAudio; `synthetic=True` sine-tone fallback needs no mic. | done | macOS, real mic + synthetic (50 Hz / 20 ms / 16 kHz mono) | +| `SpeechToTextModule` | VAD + AEC + segmentation + Whisper transcription, with multi-layer self-echo suppression. 3-backend fallback: `whisper.cpp` -> `faster-whisper` -> `openai-whisper`. | works | macOS loopback | +| `TextToSpeechModule` | Text -> speech. 3 providers: `openai` (default), `macos-say`, `pyttsx3`. Resamples to a common rate, chunks to frames, emits `tts_active` / `spoken_text` / `tts_reference_audio`. | works | macOS | +| `FunVoiceEffectsModule` | Real-time DSP chain: noise gate -> phase-vocoder pitch shift -> ring-mod ("robotize") -> bitcrush -> echo, via overlap-add STFT framing. | smoke-tested only | not fully validated - first thing to harden | +| `SpeakerModule` | Plays `AudioStamped` to the output device; (re)opens the stream on format change; emits `speaker_playing` for barge-in. | works | macOS | + +**Blueprints** (module-level vars in `module.py`): +- `demo_audio` -- `AudioModule -> SpeakerModule` (mic monitor). +- `audio_speech_loopback` -- full chain wired via `autoconnect(...).remappings(...)`, including the anti-echo signal routing. + +> CLI run-names: the blueprint variables are `demo_audio` and `audio_speech_loopback`. +> Confirm the exact `dimos run ` registration matches these before relying on the +> commands in section 3 (run-name != variable-name in some setups). + +--- + +## 3. How To Run + +```bash +# Dependencies (macOS) +brew install portaudio +pip install sounddevice numpy +# openai TTS additionally needs: pip install openai soundfile +# grant microphone permission on first real run + +# Validate AudioModule in isolation (LCM round-trip + live capture-rate check) +python examples/audio/validate_audio_module.py # synthetic (default, no mic) +python examples/audio/validate_audio_module.py --real-mic # real microphone + +# Blueprints (confirm registered run-names first) +dimos run demo-audio # mic -> speaker monitor +dimos run audio-speech-loopback # full capture -> STT -> TTS -> effects -> speaker +``` + +`validate_audio_module.py` asserts: LCM encode/decode is lossless for PCM payload + +metadata + timestamp; frame rate ~= `1000 / frame_ms` Hz (50 Hz at 20 ms); timestamps +strictly increasing. + +--- + +## 4. Architecture And Data Flow + +Each module follows the established dimos shape (mirrors `CameraModule`): typed `In[]`/`Out[]` +streams, `@rpc start()/stop()`, and an `async def main()` with a single `yield` +(open resources before, tear down after). Heavy work (capture callback, transcription, +synthesis, DSP) runs on daemon threads behind bounded queues with drop-oldest backpressure. + +**Loopback wiring** (`audio_speech_loopback`): + +```text +AudioModule.audio ─┬─────────────────────────────► SpeechToTextModule.audio + (mic_audio) │ │ text (speech_text) + │ ▼ + │ TextToSpeechModule + │ tts_active ◄──── tts_active_signal ───┤ + │ recent_tts_text ◄─── recent_tts_text ─┤ (self-echo guard) + │ tts_reference_audio ◄──── (AEC ref) ──┤ + │ │ audio (tts_audio_raw) + │ ▼ + │ FunVoiceEffectsModule + │ │ audio_out (tts_audio) + │ ▼ + └── speaker_playing_signal ◄──────── SpeakerModule (barge-in) +``` + +**Message type -- `AudioStamped`** (`dimos/msgs/audio_msgs/AudioStamped.py`): +carries a `std_msgs.Header`, `sample_rate`, `channels`, `sample_format` (e.g. `S16LE`/`F32LE`), +`coding_format` (`pcm`), and raw PCM `data`. Helpers: `from_pcm(...)`, `to_numpy()`. + +--- + +## 5. Key Design Decisions + +- **Wire type is a stand-in.** `AudioStamped` serialises to `foxglove_msgs.RawAudio` + because it is the only audio type currently mirrored in `dimos_lcm`. `RawAudio` has + **no `frame_id` field**, so `frame_id` is *dropped on encode*. `format` is packed as + `"{coding_format}/{sample_format}"` (e.g. `"pcm/S16LE"`). For cross-machine / multi-source + use, add a native `Header`-bearing audio type to `dimos-lcm`. (This is documented in the + `AudioStamped` module docstring; it is a pending team decision, not an endorsement of the + foxglove schema.) + +- **Self-echo / barge-in suppression is layered** (so the robot does not transcribe its own + TTS): + 1. **Barge-in muting** -- STT drops live audio while `tts_active` or `speaker_playing` is + true, plus a `tts_guard_seconds` tail. + 2. **Acoustic echo cancellation (AEC)** -- cross-correlation against a rolling buffer of + `tts_reference_audio`; subtracts the reference only when correlation clears a threshold. + 3. **Self-text guard** -- fuzzy match (`SequenceMatcher`) of new transcripts against recent + `recent_tts_text` within a window; drop near-duplicates of what we just said. + 4. **Consecutive-duplicate dedup** -- drop the same transcript repeated within a window. + 5. **Bad-transcript filter** -- drop Whisper non-speech captions (`[BLANK_AUDIO]`, + `(music playing)`, etc.). + +- **VAD + segmentation.** RMS-dB gate with hangover; prefer flushing an utterance on silence, + with a fixed `segment_seconds` cap so the buffer can't grow unbounded during continuous speech. + +- **Backend/provider fallback.** STT and TTS both degrade gracefully (try preferred backend, + fall through the rest, and if none is available, drain the queue rather than crash). + +--- + +## 6. Known Gaps / Not Done + +1. **`memory2` not connected** -- *the original endpoint of Issue #1932.* Audio is currently + "hear and forget": STT text / clips are never persisted. **This is the largest open item.** +2. **Agent not connected** -- the loop is mic -> STT -> TTS (an echo/effects toy), not yet + STT -> agent reasoning -> TTS. Wiring the agent is what turns it from a parrot into something + that can actually take spoken commands. +3. **Not validated on real robot (Go2 Pro / Jetson).** macOS -> Jetson has real gaps: audio + device enumeration, TTS provider (`macos-say` is unavailable off macOS -> use `openai`/`pyttsx3`), + and Whisper backend (needs an ARM-friendly backend, e.g. `whisper.cpp`). +4. **`FunVoiceEffectsModule` is smoke-tested only** -- the DSP chain runs and logs VU, but + parameter ranges and stability across sample rates are not characterised. +5. **`RawAudio` stand-in** -- decide whether to add a native `Header`-bearing LCM audio type + before multi-source / cross-machine audio is needed. + +--- + +## 7. Suggested Next Steps + +1. **On-robot bring-up (Go2 Pro):** device enumeration, switch TTS to `openai`/`pyttsx3`, + tune AEC/VAD thresholds in a real acoustic environment. +2. **Connect `memory2`:** persist STT text (and optionally clips) -- the actual intent of #1932. +3. **Connect the agent:** `STT.text -> agent -> TTS.text`, so spoken language drives behaviour. +4. **Harden `FunVoiceEffects`** and resolve the `RawAudio` vs. native-`Header` type question. + +--- + +## 8. Contact + +Questions: Zhuoran Guo -- [add WeChat / email] From 9baa8df36db335b6852475b134a50ded4c0c908a Mon Sep 17 00:00:00 2001 From: GuoZi Date: Mon, 22 Jun 2026 10:45:53 +0800 Subject: [PATCH 10/13] docs: add audio handoff contact email --- dimos/hardware/sensors/audio/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/hardware/sensors/audio/README.md b/dimos/hardware/sensors/audio/README.md index 00c3c8f220..436a50dab6 100644 --- a/dimos/hardware/sensors/audio/README.md +++ b/dimos/hardware/sensors/audio/README.md @@ -151,4 +151,4 @@ carries a `std_msgs.Header`, `sample_rate`, `channels`, `sample_format` (e.g. `S ## 8. Contact -Questions: Zhuoran Guo -- [add WeChat / email] +Questions: Zhuoran Guo -- zhuoran122623@gmail.com From 7a2ea2b521d36a7aa2cb60dcdbb0beb3410f50cc Mon Sep 17 00:00:00 2001 From: GuoZi Date: Mon, 22 Jun 2026 13:37:43 +0800 Subject: [PATCH 11/13] feat(audio): wire AgentTextModule between STT and TTS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inserts AgentTextModule into the audio_speech_loopback pipeline so spoken input is routed through a LangChain chat LLM (default gpt-4o-mini) before being handed to TTS, turning the parrot loop into an actual spoken-command interface. - AgentTextConfig: model, system_prompt, api_key (env fallback to OPENAI_API_KEY with startup warning), queue_max_texts, history_max_turns - AgentTextModule: daemon thread + bounded queue (same pattern as TTS/STT), rolling conversation history trimmed to history_max_turns pairs - Blueprint: STT.text → speech_text → AgentTextModule.text_in; AgentTextModule.text_out → agent_response → TTS.text - README: module table, data flow diagram, Known Gaps, Next Steps updated All changed lines marked # [AGENT-WIRE] for easy grep. Co-Authored-By: Claude Sonnet 4.6 --- dimos/hardware/sensors/audio/README.md | 18 ++- dimos/hardware/sensors/audio/module.py | 156 ++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 9 deletions(-) diff --git a/dimos/hardware/sensors/audio/README.md b/dimos/hardware/sensors/audio/README.md index 436a50dab6..9285558b21 100644 --- a/dimos/hardware/sensors/audio/README.md +++ b/dimos/hardware/sensors/audio/README.md @@ -23,6 +23,7 @@ All five modules and both blueprints live in a single file: |---|---|---|---| | `AudioModule` | Mic capture -> `AudioStamped` (one msg per `frame_ms` chunk). Real path via `sounddevice`/PortAudio; `synthetic=True` sine-tone fallback needs no mic. | done | macOS, real mic + synthetic (50 Hz / 20 ms / 16 kHz mono) | | `SpeechToTextModule` | VAD + AEC + segmentation + Whisper transcription, with multi-layer self-echo suppression. 3-backend fallback: `whisper.cpp` -> `faster-whisper` -> `openai-whisper`. | works | macOS loopback | +| `AgentTextModule` | **[AGENT-WIRE]** Routes STT text through a LangChain chat LLM (default `gpt-4o-mini`) and publishes the reply. Keeps a rolling conversation history (default 20 turns). Daemon thread + bounded queue so LLM latency never blocks capture/STT. | wired, untested | — | | `TextToSpeechModule` | Text -> speech. 3 providers: `openai` (default), `macos-say`, `pyttsx3`. Resamples to a common rate, chunks to frames, emits `tts_active` / `spoken_text` / `tts_reference_audio`. | works | macOS | | `FunVoiceEffectsModule` | Real-time DSP chain: noise gate -> phase-vocoder pitch shift -> ring-mod ("robotize") -> bitcrush -> echo, via overlap-add STFT framing. | smoke-tested only | not fully validated - first thing to harden | | `SpeakerModule` | Plays `AudioStamped` to the output device; (re)opens the stream on format change; emits `speaker_playing` for barge-in. | works | macOS | @@ -73,6 +74,10 @@ synthesis, DSP) runs on daemon threads behind bounded queues with drop-oldest ba ```text AudioModule.audio ─┬─────────────────────────────► SpeechToTextModule.audio (mic_audio) │ │ text (speech_text) + │ ▼ + │ AgentTextModule [AGENT-WIRE] + │ (gpt-4o-mini, rolling history) + │ │ text_out (agent_response) │ ▼ │ TextToSpeechModule │ tts_active ◄──── tts_active_signal ───┤ @@ -126,9 +131,9 @@ carries a `std_msgs.Header`, `sample_rate`, `channels`, `sample_format` (e.g. `S 1. **`memory2` not connected** -- *the original endpoint of Issue #1932.* Audio is currently "hear and forget": STT text / clips are never persisted. **This is the largest open item.** -2. **Agent not connected** -- the loop is mic -> STT -> TTS (an echo/effects toy), not yet - STT -> agent reasoning -> TTS. Wiring the agent is what turns it from a parrot into something - that can actually take spoken commands. +2. ~~**Agent not connected**~~ -- **Done (2026-06-22).** `AgentTextModule` is wired between STT and + TTS. The pipeline is now mic -> STT -> LLM agent -> TTS. Needs end-to-end validation on + macOS before robot bring-up. 3. **Not validated on real robot (Go2 Pro / Jetson).** macOS -> Jetson has real gaps: audio device enumeration, TTS provider (`macos-say` is unavailable off macOS -> use `openai`/`pyttsx3`), and Whisper backend (needs an ARM-friendly backend, e.g. `whisper.cpp`). @@ -141,10 +146,11 @@ carries a `std_msgs.Header`, `sample_rate`, `channels`, `sample_format` (e.g. `S ## 7. Suggested Next Steps -1. **On-robot bring-up (Go2 Pro):** device enumeration, switch TTS to `openai`/`pyttsx3`, - tune AEC/VAD thresholds in a real acoustic environment. +1. **Validate agent wiring end-to-end on macOS** (`dimos run audio-speech-loopback`): confirm + spoken input reaches the LLM and the reply is spoken back correctly. 2. **Connect `memory2`:** persist STT text (and optionally clips) -- the actual intent of #1932. -3. **Connect the agent:** `STT.text -> agent -> TTS.text`, so spoken language drives behaviour. +3. **On-robot bring-up (Go2 Pro):** device enumeration, switch TTS to `openai`/`pyttsx3`, + tune AEC/VAD thresholds in a real acoustic environment. 4. **Harden `FunVoiceEffects`** and resolve the `RawAudio` vs. native-`Header` type question. --- diff --git a/dimos/hardware/sensors/audio/module.py b/dimos/hardware/sensors/audio/module.py index aec22e8275..d05f1d75fc 100644 --- a/dimos/hardware/sensors/audio/module.py +++ b/dimos/hardware/sensors/audio/module.py @@ -967,6 +967,151 @@ def _transcribe_with_whisper_cpp( pass +# [AGENT-WIRE] AgentTextConfig + AgentTextModule — sits between STT and TTS +class AgentTextConfig(ModuleConfig): # [AGENT-WIRE] + model: str = "gpt-4o-mini" # [AGENT-WIRE] + system_prompt: str = ( # [AGENT-WIRE] + "You are Daneel, an AI agent created by Dimensional to control a Unitree Go2 " # [AGENT-WIRE] + "quadruped robot. The user speaks to you; you reply in one or two concise " # [AGENT-WIRE] + "sentences. Prioritise safety above all else." # [AGENT-WIRE] + ) # [AGENT-WIRE] + api_key: str | None = Field(default=None) # [AGENT-WIRE] falls back to OPENAI_API_KEY env var + queue_max_texts: int = 16 # [AGENT-WIRE] + history_max_turns: int = 20 # [AGENT-WIRE] pairs of human+AI messages kept in context + + +class AgentTextModule(Module): # [AGENT-WIRE] + """Routes transcribed speech through a chat LLM and publishes the reply. + + text_in ← raw STT transcript from SpeechToTextModule + text_out → agent reply forwarded to TextToSpeechModule + + Keeps a bounded rolling conversation history (history_max_turns pairs). + Uses the same daemon-thread + bounded-queue pattern as the other audio modules + so LLM latency never blocks the capture/STT pipeline. + """ # [AGENT-WIRE] + + config: AgentTextConfig # [AGENT-WIRE] + text_in: In[str] # [AGENT-WIRE] + text_out: Out[str] # [AGENT-WIRE] + + _queue: queue.Queue[str] | None = None # [AGENT-WIRE] + _running: threading.Event | None = None # [AGENT-WIRE] + _thread: threading.Thread | None = None # [AGENT-WIRE] + _unsub: Callable[[], None] | None = None # [AGENT-WIRE] + + async def main(self) -> None: # type: ignore[override] # [AGENT-WIRE] + self._queue = queue.Queue(maxsize=self.config.queue_max_texts) # [AGENT-WIRE] + self._running = threading.Event() # [AGENT-WIRE] + self._running.set() # [AGENT-WIRE] + self._thread = threading.Thread(target=self._worker_loop, daemon=True) # [AGENT-WIRE] + self._thread.start() # [AGENT-WIRE] + yield # [AGENT-WIRE] + if self._unsub is not None: # [AGENT-WIRE] + _unsubscribe_safely(self._unsub, label="AgentTextModule") # [AGENT-WIRE] + self._unsub = None # [AGENT-WIRE] + if self._running is not None: # [AGENT-WIRE] + self._running.clear() # [AGENT-WIRE] + if self._thread is not None: # [AGENT-WIRE] + self._thread.join(timeout=5.0) # [AGENT-WIRE] longer timeout; LLM call may be in-flight + self._thread = None # [AGENT-WIRE] + self._queue = None # [AGENT-WIRE] + self._running = None # [AGENT-WIRE] + + @rpc # [AGENT-WIRE] + def start(self) -> None: # [AGENT-WIRE] + _unsubscribe_safely(self._unsub, label="AgentTextModule") # [AGENT-WIRE] + self._unsub = None # [AGENT-WIRE] + super().start() # [AGENT-WIRE] + self._unsub = self.text_in.subscribe(self._on_text) # [AGENT-WIRE] + + @rpc # [AGENT-WIRE] + def stop(self) -> None: # [AGENT-WIRE] + super().stop() # [AGENT-WIRE] + + def _on_text(self, text: str) -> None: # [AGENT-WIRE] + if not text.strip(): # [AGENT-WIRE] + return # [AGENT-WIRE] + q = self._queue # [AGENT-WIRE] + if q is None: # [AGENT-WIRE] + return # [AGENT-WIRE] + try: # [AGENT-WIRE] + q.put_nowait(text) # [AGENT-WIRE] + except queue.Full: # [AGENT-WIRE] + try: # [AGENT-WIRE] + _ = q.get_nowait() # [AGENT-WIRE] drop oldest if queue full + except queue.Empty: # [AGENT-WIRE] + return # [AGENT-WIRE] + try: # [AGENT-WIRE] + q.put_nowait(text) # [AGENT-WIRE] + except queue.Full: # [AGENT-WIRE] + return # [AGENT-WIRE] + + def _worker_loop(self) -> None: # [AGENT-WIRE] + running = self._running # [AGENT-WIRE] + q = self._queue # [AGENT-WIRE] + if running is None or q is None: # [AGENT-WIRE] + return # [AGENT-WIRE] + + try: # [AGENT-WIRE] + from langchain.chat_models import init_chat_model # [AGENT-WIRE] + from langchain_core.messages import AIMessage, HumanMessage, SystemMessage # [AGENT-WIRE] + + api_key = self.config.api_key or os.environ.get("OPENAI_API_KEY") # [AGENT-WIRE] + if not api_key: # [AGENT-WIRE] + logger.warning( # [AGENT-WIRE] + "AgentTextModule: OPENAI_API_KEY not set and api_key not configured; " # [AGENT-WIRE] + "LLM calls will fail unless the model provider reads the key another way" # [AGENT-WIRE] + ) # [AGENT-WIRE] + llm_kwargs = {"openai_api_key": api_key} if api_key else {} # [AGENT-WIRE] + llm = init_chat_model(self.config.model, **llm_kwargs) # [AGENT-WIRE] + except Exception: # [AGENT-WIRE] + logger.exception("AgentTextModule: failed to initialise LLM; dropping all text") # [AGENT-WIRE] + while running.is_set(): # [AGENT-WIRE] + try: # [AGENT-WIRE] + _ = q.get(timeout=0.25) # [AGENT-WIRE] + except queue.Empty: # [AGENT-WIRE] + continue # [AGENT-WIRE] + return # [AGENT-WIRE] + + system_message = SystemMessage(self.config.system_prompt) # [AGENT-WIRE] + history: list = [] # [AGENT-WIRE] alternating HumanMessage / AIMessage + max_history_msgs = max(2, int(self.config.history_max_turns) * 2) # [AGENT-WIRE] + logger.info(f"AgentTextModule: ready (model={self.config.model})") # [AGENT-WIRE] + + while running.is_set(): # [AGENT-WIRE] + try: # [AGENT-WIRE] + text = q.get(timeout=0.25) # [AGENT-WIRE] + except queue.Empty: # [AGENT-WIRE] + continue # [AGENT-WIRE] + + preview = text if len(text) <= 120 else (text[:120] + "…") # [AGENT-WIRE] + logger.info(f"AgentTextModule: invoking LLM with: {preview}") # [AGENT-WIRE] + + from langchain_core.messages import HumanMessage # [AGENT-WIRE] + + human_msg = HumanMessage(content=text) # [AGENT-WIRE] + messages = [system_message, *history, human_msg] # [AGENT-WIRE] + try: # [AGENT-WIRE] + response = llm.invoke(messages) # [AGENT-WIRE] + except Exception: # [AGENT-WIRE] + logger.exception("AgentTextModule: LLM invocation failed") # [AGENT-WIRE] + continue # [AGENT-WIRE] + + reply = str(response.content).strip() # [AGENT-WIRE] + if not reply: # [AGENT-WIRE] + continue # [AGENT-WIRE] + + history.extend([human_msg, response]) # [AGENT-WIRE] + if len(history) > max_history_msgs: # [AGENT-WIRE] + history = history[-max_history_msgs:] # [AGENT-WIRE] trim oldest turns + + preview_reply = reply if len(reply) <= 120 else (reply[:120] + "…") # [AGENT-WIRE] + logger.info(f"AgentTextModule: reply: {preview_reply}") # [AGENT-WIRE] + self.text_out.publish(reply) # [AGENT-WIRE] +# [AGENT-WIRE] end AgentTextModule + + class TextToSpeechConfig(ModuleConfig): provider: Literal["openai", "macos-say", "pyttsx3"] = "openai" voice: str = "echo" @@ -1667,9 +1812,10 @@ def _worker_loop(self) -> None: ) -audio_speech_loopback = autoconnect( +audio_speech_loopback = autoconnect( # [AGENT-WIRE] AgentTextModule added to pipeline AudioModule.blueprint(), SpeechToTextModule.blueprint(), + AgentTextModule.blueprint(), # [AGENT-WIRE] inserted between STT and TTS TextToSpeechModule.blueprint(), FunVoiceEffectsModule.blueprint(), SpeakerModule.blueprint(), @@ -1685,8 +1831,12 @@ def _worker_loop(self) -> None: (SpeechToTextModule, "recent_tts_text", "recent_tts_text"), (TextToSpeechModule, "tts_reference_audio", "tts_reference_audio"), (SpeechToTextModule, "tts_reference_audio", "tts_reference_audio"), - (SpeechToTextModule, "text", "speech_text"), - (TextToSpeechModule, "text", "speech_text"), + # [AGENT-WIRE] STT → agent (was STT → TTS directly via "speech_text") + (SpeechToTextModule, "text", "speech_text"), # [AGENT-WIRE] + (AgentTextModule, "text_in", "speech_text"), # [AGENT-WIRE] + # [AGENT-WIRE] agent → TTS (new channel "agent_response") + (AgentTextModule, "text_out", "agent_response"), # [AGENT-WIRE] + (TextToSpeechModule, "text", "agent_response"), # [AGENT-WIRE] was "speech_text" (TextToSpeechModule, "audio", "tts_audio_raw"), (FunVoiceEffectsModule, "audio_in", "tts_audio_raw"), (FunVoiceEffectsModule, "audio_out", "tts_audio"), From 91ef6bd3a8a537fd9130670d04635e04401a5628 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Mon, 22 Jun 2026 14:00:43 +0800 Subject: [PATCH 12/13] docs: update audio handoff status --- dimos/hardware/sensors/audio/README.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dimos/hardware/sensors/audio/README.md b/dimos/hardware/sensors/audio/README.md index 9285558b21..806146f4fe 100644 --- a/dimos/hardware/sensors/audio/README.md +++ b/dimos/hardware/sensors/audio/README.md @@ -131,15 +131,12 @@ carries a `std_msgs.Header`, `sample_rate`, `channels`, `sample_format` (e.g. `S 1. **`memory2` not connected** -- *the original endpoint of Issue #1932.* Audio is currently "hear and forget": STT text / clips are never persisted. **This is the largest open item.** -2. ~~**Agent not connected**~~ -- **Done (2026-06-22).** `AgentTextModule` is wired between STT and - TTS. The pipeline is now mic -> STT -> LLM agent -> TTS. Needs end-to-end validation on - macOS before robot bring-up. -3. **Not validated on real robot (Go2 Pro / Jetson).** macOS -> Jetson has real gaps: audio +2. **Not validated on real robot (Go2 Pro / Jetson).** macOS -> Jetson has real gaps: audio device enumeration, TTS provider (`macos-say` is unavailable off macOS -> use `openai`/`pyttsx3`), and Whisper backend (needs an ARM-friendly backend, e.g. `whisper.cpp`). -4. **`FunVoiceEffectsModule` is smoke-tested only** -- the DSP chain runs and logs VU, but +3. **`FunVoiceEffectsModule` is smoke-tested only** -- the DSP chain runs and logs VU, but parameter ranges and stability across sample rates are not characterised. -5. **`RawAudio` stand-in** -- decide whether to add a native `Header`-bearing LCM audio type +4. **`RawAudio` stand-in** -- decide whether to add a native `Header`-bearing LCM audio type before multi-source / cross-machine audio is needed. --- From 164991ffd5af175a54518e5c981ffb36d3d9ea12 Mon Sep 17 00:00:00 2001 From: GuoZi Date: Mon, 22 Jun 2026 14:03:47 +0800 Subject: [PATCH 13/13] docs: add audio startup environment notes --- dimos/hardware/sensors/audio/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/dimos/hardware/sensors/audio/README.md b/dimos/hardware/sensors/audio/README.md index 806146f4fe..b47cadd150 100644 --- a/dimos/hardware/sensors/audio/README.md +++ b/dimos/hardware/sensors/audio/README.md @@ -47,6 +47,19 @@ pip install sounddevice numpy # openai TTS additionally needs: pip install openai soundfile # grant microphone permission on first real run +# Environment (example; keep secrets local and do not commit real keys) +export OPENAI_API_KEY="" +export SPEECHTOTEXTMODULE__BACKEND_PREFERENCE="whisper.cpp" +export SPEECHTOTEXTMODULE__WHISPER_CPP_MODEL_PATH="/absolute/path/to/dimos/dimos/models/ggml-small.en.bin" +export SPEECHTOTEXTMODULE__LANGUAGE="en" +export TEXTTOSPEECHMODULE__PROVIDER="openai" +export TEXTTOSPEECHMODULE__API_KEY="$OPENAI_API_KEY" +export SPEECHTOTEXTMODULE__DROP_DURING_TTS="true" +export SPEECHTOTEXTMODULE__TTS_GUARD_SECONDS="0.8" +export SPEECHTOTEXTMODULE__VAD_ENABLED="true" +export SPEECHTOTEXTMODULE__VAD_FLUSH_ON_SILENCE="true" +export FUNVOICEEFFECTSMODULE__ENABLED="false" + # Validate AudioModule in isolation (LCM round-trip + live capture-rate check) python examples/audio/validate_audio_module.py # synthetic (default, no mic) python examples/audio/validate_audio_module.py --real-mic # real microphone