diff --git a/agents/openclaw_gateway.py b/agents/openclaw_gateway.py index 6df6191..7ac956f 100644 --- a/agents/openclaw_gateway.py +++ b/agents/openclaw_gateway.py @@ -7,11 +7,13 @@ import json import logging import os +from pathlib import Path import shlex import subprocess import time import uuid from typing import Any +from urllib.parse import urlparse from core.config import cfg from core import perf @@ -28,7 +30,7 @@ _LAST_CHAT_RUN_BY_SESSION: dict[str, dict[str, Any]] = {} _GATEWAY_RPC_CLIENT: "OpenClawGatewayRpcClient | None" = None _GATEWAY_RPC_CLIENT_KEY: tuple[str, str | None] | None = None -_PROTOCOL_VERSION = 3 +_PROTOCOL_VERSION = 4 def build_openclaw_base_command(raw: Any) -> list[str]: @@ -105,7 +107,7 @@ def _resolve_gateway_url() -> str: return "ws://127.0.0.1:18789" -def _resolve_gateway_token() -> str | None: +def _resolve_gateway_token(gateway_url: str | None = None) -> str | None: token = getattr(cfg.openclaw, "gateway_token", None) if token: return str(token) @@ -114,6 +116,42 @@ def _resolve_gateway_token() -> str | None: speaker_token = getattr(speaker_gateway, "token", None) if speaker_token: return str(speaker_token) + env_token = os.environ.get("OPENCLAW_GATEWAY_TOKEN") + if env_token: + return env_token + if _is_loopback_gateway_url(gateway_url or _resolve_gateway_url()): + return _read_openclaw_gateway_token() + return None + + +def _is_loopback_gateway_url(gateway_url: str) -> bool: + try: + parsed = urlparse(str(gateway_url)) + except Exception: + return False + host = (parsed.hostname or "").strip().lower() + return host in {"localhost", "127.0.0.1", "::1"} + + +def _read_openclaw_gateway_token() -> str | None: + config_path = Path(os.environ.get("OPENCLAW_CONFIG") or "~/.openclaw/openclaw.json").expanduser() + try: + data = json.loads(config_path.read_text(encoding="utf-8-sig")) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(data, dict): + return None + gateway = data.get("gateway") + if not isinstance(gateway, dict): + return None + auth = gateway.get("auth") + if not isinstance(auth, dict): + return None + if auth.get("mode") != "token": + return None + token = auth.get("token") + if isinstance(token, str) and token.strip(): + return token.strip() return None @@ -296,7 +334,8 @@ def _format_gateway_error(error: Any) -> str: async def _get_gateway_rpc_client() -> OpenClawGatewayRpcClient: global _GATEWAY_RPC_CLIENT, _GATEWAY_RPC_CLIENT_KEY - key = (_resolve_gateway_url(), _resolve_gateway_token()) + gateway_url = _resolve_gateway_url() + key = (gateway_url, _resolve_gateway_token(gateway_url)) if _GATEWAY_RPC_CLIENT is None or _GATEWAY_RPC_CLIENT_KEY != key: if _GATEWAY_RPC_CLIENT is not None: await _GATEWAY_RPC_CLIENT.close() diff --git a/agents/speaker_agent.py b/agents/speaker_agent.py index bb58911..34ee100 100644 --- a/agents/speaker_agent.py +++ b/agents/speaker_agent.py @@ -51,6 +51,15 @@ def __init__( self._interrupt_all_generation = 0 self._interrupted_run_ids: set[str] = set() + def _emoji_display_enabled(self) -> bool: + emoji_display = self._emoji_display + if emoji_display is None: + return False + return bool(getattr(getattr(emoji_display, "config", None), "enabled", False)) + + def _accepts_segments(self) -> bool: + return self._enabled or self._emoji_display_enabled() + async def start(self) -> None: if self._worker_task and not self._worker_task.done(): return @@ -77,7 +86,7 @@ async def set_enabled(self, enabled: bool, *, reason: str = "") -> dict: return {**self.get_status(), "dropped": dropped} def enqueue(self, segment: SpeechSegment) -> bool: - if not self._enabled: + if not self._accepts_segments(): return False if self._is_run_interrupted(segment.run_id): log.debug( @@ -171,7 +180,7 @@ async def _worker(self) -> None: if segment is None: self._queue.task_done() break - if not self._enabled: + if not self._accepts_segments(): self._queue.task_done() continue self._current_segment = segment @@ -179,10 +188,12 @@ async def _worker(self) -> None: if self._is_segment_interrupted(segment, interrupt_generation): await self._skip_interrupted_segment(segment) continue - await self._ensure_run_ducking(segment.run_id) - if self._is_segment_interrupted(segment, interrupt_generation): - await self._skip_interrupted_segment(segment) - continue + should_speak = self._enabled + if should_speak: + await self._ensure_run_ducking(segment.run_id) + if self._is_segment_interrupted(segment, interrupt_generation): + await self._skip_interrupted_segment(segment) + continue parsed = extract_emoji_for_speech(segment.text) if parsed.tokens: log.debug( @@ -200,6 +211,10 @@ async def _worker(self) -> None: if self._is_segment_interrupted(segment, interrupt_generation): await self._skip_interrupted_segment(segment) continue + if not should_speak: + self._queue.task_done() + self._current_segment = None + continue if not parsed.speech_text: log.info( "SpeakerAgent: skipped speech for emoji-only segment id=%s run_id=%s", @@ -355,12 +370,15 @@ def __init__( self._last_error = "" self._seen_delta_runs: set[str] = set() + def _should_listen_to_gateway(self) -> bool: + return bool(self._config.enabled or self._config.emoji_display.enabled) + async def start(self) -> None: if self._running: return self._running = True await self._playback.start() - if self._config.enabled: + if self._should_listen_to_gateway(): self._ensure_gateway_task() log.info("SpeakerAgent: started (enabled=%s)", self._config.enabled) @@ -392,8 +410,11 @@ async def set_enabled( if self._running: self._ensure_gateway_task() else: - await self._stop_gateway_task() await self._playback.set_enabled(False, reason=reason or source) + if not self._config.emoji_display.enabled: + await self._stop_gateway_task() + elif self._running: + self._ensure_gateway_task() self._enabled_source = str(source or "api") self._enabled_reason = str(reason or "") self._enabled_changed_at = time.time() @@ -468,7 +489,7 @@ async def _stop_gateway_task(self) -> None: async def _run_forever(self) -> None: backoff_s = 1.0 - while self._running and self._config.enabled: + while self._running and self._should_listen_to_gateway(): try: await self._run_until_disconnect() backoff_s = 1.0 @@ -493,7 +514,7 @@ async def _run_until_disconnect(self) -> None: if not self._config.speaker.speak_existing_on_start: await self._mark_current_message_seen(gateway, deduper) async for event in gateway.events(): - if not self._running or not self._config.enabled: + if not self._running or not self._should_listen_to_gateway(): break await self._handle_event(event, gateway, deduper, router) finally: diff --git a/deploy/systemd/listener.service b/deploy/systemd/listener.service index de5b4e1..e0f0f8e 100644 --- a/deploy/systemd/listener.service +++ b/deploy/systemd/listener.service @@ -7,6 +7,7 @@ Wants=network-online.target Type=simple WorkingDirectory=/home/re/src/Listener Environment=PYTHONUNBUFFERED=1 +Environment=PATH=/home/re/.local/bin:/usr/local/bin:/usr/bin:/bin ExecStart=/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/main.py ExecReload=/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/utils/listenerctl.py speech_gate_reset --reason systemd-reload ExecStop=-/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/utils/listenerctl.py stop --reason systemd diff --git a/speaker/gateway.py b/speaker/gateway.py index be884bd..21ae742 100644 --- a/speaker/gateway.py +++ b/speaker/gateway.py @@ -19,7 +19,7 @@ from .config import GatewayConfig log = logging.getLogger(__name__) -PROTOCOL_VERSION = 3 +PROTOCOL_VERSION = 4 class GatewayError(RuntimeError): diff --git a/tests/test_openclaw_gateway.py b/tests/test_openclaw_gateway.py index 7ca84e6..ff60a1a 100644 --- a/tests/test_openclaw_gateway.py +++ b/tests/test_openclaw_gateway.py @@ -1,4 +1,5 @@ import asyncio +import json from pathlib import Path import subprocess import sys @@ -17,6 +18,7 @@ remember_openclaw_chat_run, steer_openclaw_chat_session, ) +import agents.openclaw_gateway as openclaw_gateway_module # noqa: E402 from agents.openclaw_input_agent import OpenClawInputAgent # noqa: E402 from core.bus import Event # noqa: E402 from core.config import cfg # noqa: E402 @@ -47,10 +49,41 @@ async def _runner() -> None: frame = json.loads(ws.sent[0]) assert frame["params"]["client"]["id"] == "gateway-client" + assert frame["params"]["minProtocol"] == 4 + assert frame["params"]["maxProtocol"] == 4 asyncio.run(_runner()) +def test_gateway_token_falls_back_to_local_openclaw_config(tmp_path, monkeypatch): + openclaw_config = tmp_path / ".openclaw" / "openclaw.json" + openclaw_config.parent.mkdir() + openclaw_config.write_text( + json.dumps({"gateway": {"auth": {"mode": "token", "token": "local-token"}}}), + encoding="utf-8", + ) + + old_openclaw_token = cfg.openclaw.gateway_token + speaker_gateway = getattr(getattr(cfg, "speaker", None), "gateway", None) + old_speaker_token = getattr(speaker_gateway, "token", None) + monkeypatch.setattr(cfg.openclaw, "gateway_token", None) + if speaker_gateway is not None: + monkeypatch.setattr(speaker_gateway, "token", None) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.delenv("OPENCLAW_CONFIG", raising=False) + monkeypatch.delenv("OPENCLAW_GATEWAY_TOKEN", raising=False) + try: + assert ( + openclaw_gateway_module._resolve_gateway_token("ws://127.0.0.1:18789") + == "local-token" + ) + assert openclaw_gateway_module._resolve_gateway_token("wss://example.invalid") is None + finally: + cfg.openclaw.gateway_token = old_openclaw_token + if speaker_gateway is not None: + speaker_gateway.token = old_speaker_token + + def test_abort_openclaw_chat_session_uses_remembered_run_id_first(monkeypatch): async def _runner() -> None: calls: list[tuple[str, dict]] = [] diff --git a/tests/test_speaker_agent.py b/tests/test_speaker_agent.py index 6642e54..883d009 100644 --- a/tests/test_speaker_agent.py +++ b/tests/test_speaker_agent.py @@ -13,6 +13,7 @@ from core.runtime_state import RuntimeStateStore # noqa: E402 from speaker.config import SpeakerConfig # noqa: E402 from speaker.events import ChatSpeechRouter, SpeechSegment # noqa: E402 +from speaker.gateway import GatewayClient # noqa: E402 from speaker.messages import MessageDeduper # noqa: E402 @@ -31,9 +32,10 @@ async def speak(self, text: str) -> None: class RecordingEmojiDisplay: - def __init__(self) -> None: + def __init__(self, *, enabled: bool = True) -> None: self.shown = [] self.cleared = [] + self.config = type("EmojiDisplayConfig", (), {"enabled": enabled})() async def show_tokens(self, tokens, *, run_id, segment_id): self.shown.append(([token.symbol for token in tokens], run_id, segment_id)) @@ -107,6 +109,36 @@ async def _runner() -> None: asyncio.run(_runner()) +def test_speaker_gateway_client_uses_protocol_v4(): + async def _runner() -> None: + class FakeWebSocket: + def __init__(self) -> None: + self.sent: list[str] = [] + + async def send(self, payload: str) -> None: + self.sent.append(payload) + + async def recv(self) -> str: + import json + + request = json.loads(self.sent[-1]) + return json.dumps({"type": "res", "id": request["id"], "ok": True, "payload": {}}) + + import json + + client = GatewayClient(SpeakerConfig().gateway) + client._ws = FakeWebSocket() # pylint: disable=protected-access + + await client._send_connect() # pylint: disable=protected-access + + frame = json.loads(client._ws.sent[0]) # type: ignore[union-attr] + assert frame["params"]["client"]["id"] == "gateway-client" + assert frame["params"]["minProtocol"] == 4 + assert frame["params"]["maxProtocol"] == 4 + + asyncio.run(_runner()) + + def test_speech_playback_controller_restores_ducking_when_interrupted_before_tts( monkeypatch, ): @@ -270,6 +302,39 @@ async def speak(self, text: str) -> None: asyncio.run(_runner()) +def test_speech_playback_controller_shows_emoji_when_speech_disabled(): + async def _runner() -> None: + class RecordingSpeech: + def __init__(self) -> None: + self.spoken = [] + + async def speak(self, text: str) -> None: + self.spoken.append(text) + + speech = RecordingSpeech() + display = RecordingEmojiDisplay(enabled=True) + controller = SpeechPlaybackController( + speech=speech, + queue_size=4, + enabled=False, + emoji_display=display, + ) + await controller.start() + try: + assert controller.enqueue(SpeechSegment("seg-emoji", "Привет 🙂", "run-1")) is True + for _ in range(10): + if display.shown: + break + await asyncio.sleep(0.01) + + assert speech.spoken == [] + assert display.shown == [(["🙂"], "run-1", "seg-emoji")] + finally: + await controller.close() + + asyncio.run(_runner()) + + def test_speaker_agent_restores_persisted_enabled_state(tmp_path): async def _runner() -> None: class RecordingSpeech: @@ -333,3 +398,48 @@ async def speak(self, text: str) -> None: assert queued == ["Первое предложение.", "Второе предложение."] asyncio.run(_runner()) + + +def test_speaker_agent_keeps_gateway_running_for_emoji_display_when_speech_disabled(): + async def _runner() -> None: + class RecordingSpeech: + async def speak(self, text: str) -> None: + return None + + class IdleGateway: + def __init__(self) -> None: + self.connected = asyncio.Event() + self.closed = asyncio.Event() + + async def connect(self) -> None: + self.connected.set() + + async def close(self) -> None: + self.closed.set() + + async def events(self): + while not self.closed.is_set(): + await asyncio.sleep(0.01) + if False: + yield None + + config = SpeakerConfig() + config.enabled = False + config.emoji_display.enabled = True + gateway = IdleGateway() + agent = SpeakerAgent( + config=config, + speech=RecordingSpeech(), + gateway_factory=lambda _cfg: gateway, + state_store=RuntimeStateStore(None), + ) + try: + await agent.start() + await asyncio.wait_for(gateway.connected.wait(), timeout=1.0) + + assert agent.get_status()["enabled"] is False + assert agent._gateway_task is not None + finally: + await agent.close() + + asyncio.run(_runner())