Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions agents/openclaw_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import json
import logging
import os
from pathlib import Path
import shlex
import subprocess
import time
import uuid
from typing import Any
from urllib.parse import urlparse

from core.config import cfg
from core import perf
Expand All @@ -28,7 +30,7 @@
_LAST_CHAT_RUN_BY_SESSION: dict[str, dict[str, Any]] = {}
_GATEWAY_RPC_CLIENT: "OpenClawGatewayRpcClient | None" = None
_GATEWAY_RPC_CLIENT_KEY: tuple[str, str | None] | None = None
_PROTOCOL_VERSION = 3
_PROTOCOL_VERSION = 4


def build_openclaw_base_command(raw: Any) -> list[str]:
Expand Down Expand Up @@ -105,7 +107,7 @@ def _resolve_gateway_url() -> str:
return "ws://127.0.0.1:18789"


def _resolve_gateway_token() -> str | None:
def _resolve_gateway_token(gateway_url: str | None = None) -> str | None:
token = getattr(cfg.openclaw, "gateway_token", None)
if token:
return str(token)
Expand All @@ -114,6 +116,42 @@ def _resolve_gateway_token() -> str | None:
speaker_token = getattr(speaker_gateway, "token", None)
if speaker_token:
return str(speaker_token)
env_token = os.environ.get("OPENCLAW_GATEWAY_TOKEN")
if env_token:
return env_token
if _is_loopback_gateway_url(gateway_url or _resolve_gateway_url()):
return _read_openclaw_gateway_token()
return None


def _is_loopback_gateway_url(gateway_url: str) -> bool:
try:
parsed = urlparse(str(gateway_url))
except Exception:
return False
host = (parsed.hostname or "").strip().lower()
return host in {"localhost", "127.0.0.1", "::1"}


def _read_openclaw_gateway_token() -> str | None:
config_path = Path(os.environ.get("OPENCLAW_CONFIG") or "~/.openclaw/openclaw.json").expanduser()
try:
data = json.loads(config_path.read_text(encoding="utf-8-sig"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(data, dict):
return None
gateway = data.get("gateway")
if not isinstance(gateway, dict):
return None
auth = gateway.get("auth")
if not isinstance(auth, dict):
return None
if auth.get("mode") != "token":
return None
token = auth.get("token")
if isinstance(token, str) and token.strip():
return token.strip()
return None


Expand Down Expand Up @@ -296,7 +334,8 @@ def _format_gateway_error(error: Any) -> str:

async def _get_gateway_rpc_client() -> OpenClawGatewayRpcClient:
global _GATEWAY_RPC_CLIENT, _GATEWAY_RPC_CLIENT_KEY
key = (_resolve_gateway_url(), _resolve_gateway_token())
gateway_url = _resolve_gateway_url()
key = (gateway_url, _resolve_gateway_token(gateway_url))
if _GATEWAY_RPC_CLIENT is None or _GATEWAY_RPC_CLIENT_KEY != key:
if _GATEWAY_RPC_CLIENT is not None:
await _GATEWAY_RPC_CLIENT.close()
Expand Down
41 changes: 31 additions & 10 deletions agents/speaker_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def __init__(
self._interrupt_all_generation = 0
self._interrupted_run_ids: set[str] = set()

def _emoji_display_enabled(self) -> bool:
emoji_display = self._emoji_display
if emoji_display is None:
return False
return bool(getattr(getattr(emoji_display, "config", None), "enabled", False))

def _accepts_segments(self) -> bool:
return self._enabled or self._emoji_display_enabled()

async def start(self) -> None:
if self._worker_task and not self._worker_task.done():
return
Expand All @@ -77,7 +86,7 @@ async def set_enabled(self, enabled: bool, *, reason: str = "") -> dict:
return {**self.get_status(), "dropped": dropped}

def enqueue(self, segment: SpeechSegment) -> bool:
if not self._enabled:
if not self._accepts_segments():
return False
if self._is_run_interrupted(segment.run_id):
log.debug(
Expand Down Expand Up @@ -171,18 +180,20 @@ async def _worker(self) -> None:
if segment is None:
self._queue.task_done()
break
if not self._enabled:
if not self._accepts_segments():
self._queue.task_done()
continue
self._current_segment = segment
interrupt_generation = self._interrupt_all_generation
if self._is_segment_interrupted(segment, interrupt_generation):
await self._skip_interrupted_segment(segment)
continue
await self._ensure_run_ducking(segment.run_id)
if self._is_segment_interrupted(segment, interrupt_generation):
await self._skip_interrupted_segment(segment)
continue
should_speak = self._enabled
if should_speak:
await self._ensure_run_ducking(segment.run_id)
if self._is_segment_interrupted(segment, interrupt_generation):
await self._skip_interrupted_segment(segment)
continue
parsed = extract_emoji_for_speech(segment.text)
if parsed.tokens:
log.debug(
Expand All @@ -200,6 +211,10 @@ async def _worker(self) -> None:
if self._is_segment_interrupted(segment, interrupt_generation):
await self._skip_interrupted_segment(segment)
continue
if not should_speak:
self._queue.task_done()
self._current_segment = None
continue
if not parsed.speech_text:
log.info(
"SpeakerAgent: skipped speech for emoji-only segment id=%s run_id=%s",
Expand Down Expand Up @@ -355,12 +370,15 @@ def __init__(
self._last_error = ""
self._seen_delta_runs: set[str] = set()

def _should_listen_to_gateway(self) -> bool:
return bool(self._config.enabled or self._config.emoji_display.enabled)

async def start(self) -> None:
if self._running:
return
self._running = True
await self._playback.start()
if self._config.enabled:
if self._should_listen_to_gateway():
self._ensure_gateway_task()
log.info("SpeakerAgent: started (enabled=%s)", self._config.enabled)

Expand Down Expand Up @@ -392,8 +410,11 @@ async def set_enabled(
if self._running:
self._ensure_gateway_task()
else:
await self._stop_gateway_task()
await self._playback.set_enabled(False, reason=reason or source)
if not self._config.emoji_display.enabled:
await self._stop_gateway_task()
elif self._running:
self._ensure_gateway_task()
self._enabled_source = str(source or "api")
self._enabled_reason = str(reason or "")
self._enabled_changed_at = time.time()
Expand Down Expand Up @@ -468,7 +489,7 @@ async def _stop_gateway_task(self) -> None:

async def _run_forever(self) -> None:
backoff_s = 1.0
while self._running and self._config.enabled:
while self._running and self._should_listen_to_gateway():
try:
await self._run_until_disconnect()
backoff_s = 1.0
Expand All @@ -493,7 +514,7 @@ async def _run_until_disconnect(self) -> None:
if not self._config.speaker.speak_existing_on_start:
await self._mark_current_message_seen(gateway, deduper)
async for event in gateway.events():
if not self._running or not self._config.enabled:
if not self._running or not self._should_listen_to_gateway():
break
await self._handle_event(event, gateway, deduper, router)
finally:
Expand Down
1 change: 1 addition & 0 deletions deploy/systemd/listener.service
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Wants=network-online.target
Type=simple
WorkingDirectory=/home/re/src/Listener
Environment=PYTHONUNBUFFERED=1
Environment=PATH=/home/re/.local/bin:/usr/local/bin:/usr/bin:/bin
ExecStart=/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/main.py
ExecReload=/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/utils/listenerctl.py speech_gate_reset --reason systemd-reload
ExecStop=-/home/re/src/Listener/.venv/bin/python /home/re/src/Listener/utils/listenerctl.py stop --reason systemd
Expand Down
2 changes: 1 addition & 1 deletion speaker/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .config import GatewayConfig

log = logging.getLogger(__name__)
PROTOCOL_VERSION = 3
PROTOCOL_VERSION = 4


class GatewayError(RuntimeError):
Expand Down
33 changes: 33 additions & 0 deletions tests/test_openclaw_gateway.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
from pathlib import Path
import subprocess
import sys
Expand All @@ -17,6 +18,7 @@
remember_openclaw_chat_run,
steer_openclaw_chat_session,
)
import agents.openclaw_gateway as openclaw_gateway_module # noqa: E402
from agents.openclaw_input_agent import OpenClawInputAgent # noqa: E402
from core.bus import Event # noqa: E402
from core.config import cfg # noqa: E402
Expand Down Expand Up @@ -47,10 +49,41 @@ async def _runner() -> None:

frame = json.loads(ws.sent[0])
assert frame["params"]["client"]["id"] == "gateway-client"
assert frame["params"]["minProtocol"] == 4
assert frame["params"]["maxProtocol"] == 4

asyncio.run(_runner())


def test_gateway_token_falls_back_to_local_openclaw_config(tmp_path, monkeypatch):
openclaw_config = tmp_path / ".openclaw" / "openclaw.json"
openclaw_config.parent.mkdir()
openclaw_config.write_text(
json.dumps({"gateway": {"auth": {"mode": "token", "token": "local-token"}}}),
encoding="utf-8",
)

old_openclaw_token = cfg.openclaw.gateway_token
speaker_gateway = getattr(getattr(cfg, "speaker", None), "gateway", None)
old_speaker_token = getattr(speaker_gateway, "token", None)
monkeypatch.setattr(cfg.openclaw, "gateway_token", None)
if speaker_gateway is not None:
monkeypatch.setattr(speaker_gateway, "token", None)
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.delenv("OPENCLAW_CONFIG", raising=False)
monkeypatch.delenv("OPENCLAW_GATEWAY_TOKEN", raising=False)
try:
assert (
openclaw_gateway_module._resolve_gateway_token("ws://127.0.0.1:18789")
== "local-token"
)
assert openclaw_gateway_module._resolve_gateway_token("wss://example.invalid") is None
finally:
cfg.openclaw.gateway_token = old_openclaw_token
if speaker_gateway is not None:
speaker_gateway.token = old_speaker_token


def test_abort_openclaw_chat_session_uses_remembered_run_id_first(monkeypatch):
async def _runner() -> None:
calls: list[tuple[str, dict]] = []
Expand Down
Loading
Loading