Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pipeline/tool_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ def _github_check() -> bool:
return is_authenticated()


def _whatsapp_check() -> bool:
# No OAuth — "connected" means the local GoWA REST server has a paired device.
# Short timeout so an absent/idle server doesn't stall schema building.
from pipeline.tools_whatsapp import has_paired_device
return has_paired_device(timeout=3.0)


# ── toolset 정의 ──────────────────────────────────────────────────────────────
# hermes-agent toolsets.py의 TOOLSETS 패턴 — 서비스(toolset) 단위로 도구를 묶고
# check_fn 하나로 가용성을 판정한다.
Expand Down Expand Up @@ -118,6 +125,14 @@ def _github_check() -> bool:
"check_fn": _github_check,
"connect_hint": "설정 → 워크스페이스에서 GitHub PAT 를 연결하라 (GITHUB_PERSONAL_ACCESS_TOKEN).",
},
"whatsapp": {
"description": "WhatsApp (채팅 목록/메시지 읽기·전송, 로컬 GoWA 서버 경유)",
"tools": [
"whatsapp_list_chats", "whatsapp_read_messages", "whatsapp_send_message",
],
"check_fn": _whatsapp_check,
"connect_hint": "GoWA 서버 기동 + QR 페어링 필요 (whatsapp rest --port 3777, VEGA_WHATSAPP_GOWA_URL).",
},
}

_TOOL_TO_TOOLSET: dict[str, str] = {
Expand Down
10 changes: 10 additions & 0 deletions pipeline/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def set_ce_mode(active: bool) -> None:
# GitHub
"github_list_issues", "github_get_issue", "github_list_pulls",
"github_get_pull", "github_search_code", "github_read_file",
# WhatsApp (read only — send is write, excluded)
"whatsapp_list_chats", "whatsapp_read_messages",
})


Expand Down Expand Up @@ -1098,6 +1100,12 @@ def get_schemas_for_mode(
from pipeline.tools_superthread import SUPERTHREAD_TOOL_SCHEMAS, SUPERTHREAD_TOOL_FUNCTIONS
from pipeline.tools_airtable import AIRTABLE_TOOL_SCHEMAS, AIRTABLE_TOOL_FUNCTIONS
from pipeline.tools_github import GITHUB_TOOL_SCHEMAS, GITHUB_TOOL_FUNCTIONS
# WhatsApp wraps a local GoWA REST server — optional (module skipped if unavailable).
try:
from pipeline.tools_whatsapp import WHATSAPP_TOOL_SCHEMAS, WHATSAPP_TOOL_FUNCTIONS
except Exception:
WHATSAPP_TOOL_SCHEMAS: list[dict] = []
WHATSAPP_TOOL_FUNCTIONS: dict[str, Any] = {}

TOOL_SCHEMAS.extend(MEMORY_TOOL_SCHEMAS)
TOOL_SCHEMAS.extend(SESSION_TOOL_SCHEMAS)
Expand All @@ -1107,6 +1115,7 @@ def get_schemas_for_mode(
TOOL_SCHEMAS.extend(SUPERTHREAD_TOOL_SCHEMAS)
TOOL_SCHEMAS.extend(AIRTABLE_TOOL_SCHEMAS)
TOOL_SCHEMAS.extend(GITHUB_TOOL_SCHEMAS)
TOOL_SCHEMAS.extend(WHATSAPP_TOOL_SCHEMAS)

# vega-agent: 네이티브 linear_* 도구는 pipeline.linear_client(개인 VEGA 전용, 여기 없음)에
# 의존한다. 모듈이 없으면 호출 시 무조건 실패하고 self_improve 가 폭주하므로,
Expand Down Expand Up @@ -1909,6 +1918,7 @@ def mcp_reload() -> dict:
TOOL_FUNCTIONS.update(SUPERTHREAD_TOOL_FUNCTIONS)
TOOL_FUNCTIONS.update(AIRTABLE_TOOL_FUNCTIONS)
TOOL_FUNCTIONS.update(GITHUB_TOOL_FUNCTIONS)
TOOL_FUNCTIONS.update(WHATSAPP_TOOL_FUNCTIONS)


def dispatch_tool(name: str, arguments: dict) -> str:
Expand Down
220 changes: 220 additions & 0 deletions pipeline/tools_whatsapp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# Created: 2026-07-02
# Purpose: WhatsApp native tools — wraps a local GoWA (go-whatsapp-web-multidevice) REST server (INT-2323)
# Dependencies: stdlib only (urllib). No OAuth — auth is the locally paired GoWA device.
# Test Status: tests/test_whatsapp_int2323.py

from __future__ import annotations

import json
import os
import urllib.parse
import urllib.request
from typing import Any

_DEFAULT_URL = "http://localhost:3777"

# Resolved GoWA device_id cache (process lifetime). v8 multi-device requires X-Device-Id
# on every request. Reset via reset_device_cache() (used by tests / after re-pairing).
_device_cache: str | None = None


def _base_url() -> str:
return (os.getenv("VEGA_WHATSAPP_GOWA_URL") or _DEFAULT_URL).rstrip("/")


def reset_device_cache() -> None:
"""Drop the cached device_id — forces re-resolution on next call."""
global _device_cache
_device_cache = None


# ── HTTP (stdlib urllib) ──────────────────────────────────────────────────────

def _get(path: str, params: dict | None = None, *, device_id: str | None = None,
timeout: float = 30.0) -> dict:
url = _base_url() + path
if params:
url += "?" + urllib.parse.urlencode(
{k: v for k, v in params.items() if v not in (None, "")}
)
headers = {}
if device_id:
headers["X-Device-Id"] = device_id
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())


def _post_json(path: str, body: dict, *, device_id: str | None = None,
timeout: float = 30.0) -> dict:
data = json.dumps(body).encode()
headers = {"Content-Type": "application/json"}
if device_id:
headers["X-Device-Id"] = device_id
req = urllib.request.Request(_base_url() + path, data=data, method="POST", headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())


# ── Defensive parsing ─────────────────────────────────────────────────────────
# GoWA responses are inconsistent: `results` is a dict for chats (results.data)
# but a plain list for /app/devices. Handle both shapes.

def _rows(results: Any) -> list:
if isinstance(results, dict):
data = results.get("data")
return data if isinstance(data, list) else []
if isinstance(results, list):
return results
return []


def _devices() -> list:
data = _get("/app/devices", timeout=30.0)
return _rows(data.get("results"))


def _resolve_device() -> str:
"""Return the GoWA device_id (cached). Raise if the server is unreachable or unpaired."""
global _device_cache
if _device_cache:
return _device_cache
try:
devices = _devices()
except Exception as e:
raise RuntimeError(
"WhatsApp GoWA 서버에 연결할 수 없습니다 — GoWA REST 서버를 기동하세요 "
f"(whatsapp rest --port …, {_base_url()}). 원인: {e}"
) from e
if not devices:
raise RuntimeError("WhatsApp GoWA 미연결 — QR 페어링 필요")
dev = devices[0].get("device") or devices[0].get("jid")
if not dev:
raise RuntimeError("WhatsApp GoWA 미연결 — QR 페어링 필요")
_device_cache = str(dev)
return _device_cache


def has_paired_device(timeout: float = 3.0) -> bool:
"""Connectivity/pairing probe for the toolset gate. Never raises."""
try:
data = _get("/app/devices", timeout=timeout)
except Exception:
return False
return bool(_rows(data.get("results")))


def _msg_text(m: dict) -> str:
"""Best-effort text extraction — GoWA message shape varies by message type."""
for k in ("text", "message", "body", "content", "caption"):
v = m.get(k)
if isinstance(v, str) and v:
return v[:4000]
if isinstance(v, dict):
for kk in ("text", "conversation", "caption", "body"):
vv = v.get(kk)
if isinstance(vv, str) and vv:
return vv[:4000]
return ""


# ── Tools ─────────────────────────────────────────────────────────────────────

def whatsapp_list_chats(limit: int = 20) -> list[dict]:
"""List recent WhatsApp chats (jid/name/last_message_time)."""
dev = _resolve_device()
data = _get("/chats", {"limit": int(limit)}, device_id=dev)
out = []
for c in _rows(data.get("results")):
out.append({
"jid": c.get("jid"),
"name": c.get("name"),
"last_message_time": c.get("last_message_time"),
"archived": c.get("archived"),
})
return out


def whatsapp_read_messages(chat_jid: str, limit: int = 20) -> list[dict]:
"""Read recent messages of a specific chat. chat_jid from whatsapp_list_chats."""
dev = _resolve_device()
data = _get(f"/chat/{urllib.parse.quote(chat_jid, safe='@.')}/messages",
{"limit": int(limit)}, device_id=dev)
out = []
for m in _rows(data.get("results")):
out.append({
"id": m.get("id") or m.get("message_id"),
"sender": m.get("sender") or m.get("from") or m.get("pushname"),
"timestamp": m.get("timestamp") or m.get("message_time"),
"from_me": m.get("from_me", m.get("is_from_me")),
"text": _msg_text(m),
})
return out


def whatsapp_send_message(phone: str, message: str) -> dict:
"""Send a WhatsApp message. phone is a number (auto-suffixed @s.whatsapp.net)
or a full jid (individual @s.whatsapp.net / group @g.us — kept as-is)."""
dev = _resolve_device()
target = phone if "@" in phone else f"{phone}@s.whatsapp.net"
data = _post_json("/send/message", {"phone": target, "message": message}, device_id=dev)
return {
"ok": data.get("code") == "SUCCESS",
"code": data.get("code"),
"phone": target,
"results": data.get("results"),
}


WHATSAPP_TOOL_SCHEMAS: list[dict] = [
{
"type": "function",
"name": "whatsapp_send_message",
"description": (
"WhatsApp 메시지를 직접 전송한다(사용자 명의, 로컬 GoWA 서버 경유). "
"정리한 내용을 복붙 안내하지 말고 이 도구로 바로 보낸다. "
"phone 은 국가코드 포함 번호(예: 821012345678) — @s.whatsapp.net 은 자동 부착된다. "
"그룹은 whatsapp_list_chats 의 jid(…@g.us)를 그대로 넣는다. "
"(GoWA 서버 미연결/미페어링 시 재연결 안내.)"
),
"parameters": {
"type": "object",
"properties": {
"phone": {"type": "string", "description": "수신 번호(국가코드 포함) 또는 전체 jid(개인 …@s.whatsapp.net / 그룹 …@g.us)"},
"message": {"type": "string", "description": "보낼 메시지 본문"},
},
"required": ["phone", "message"],
},
},
{
"type": "function",
"name": "whatsapp_list_chats",
"description": "WhatsApp 최근 채팅 목록을 조회한다. 각 항목의 jid 로 whatsapp_read_messages·whatsapp_send_message 를 호출한다.",
"parameters": {
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 20, "description": "최대 채팅 수"},
},
"required": [],
},
},
{
"type": "function",
"name": "whatsapp_read_messages",
"description": "특정 WhatsApp 채팅의 최근 메시지를 읽는다. chat_jid 는 whatsapp_list_chats 결과의 jid.",
"parameters": {
"type": "object",
"properties": {
"chat_jid": {"type": "string", "description": "채팅 jid (…@s.whatsapp.net / …@g.us)"},
"limit": {"type": "integer", "default": 20, "description": "메시지 수"},
},
"required": ["chat_jid"],
},
},
]

WHATSAPP_TOOL_FUNCTIONS: dict[str, Any] = {
"whatsapp_list_chats": whatsapp_list_chats,
"whatsapp_read_messages": whatsapp_read_messages,
"whatsapp_send_message": whatsapp_send_message,
}
103 changes: 103 additions & 0 deletions pipeline/whatsapp_sidecar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Created: 2026-07-02
# Purpose: Manage the GoWA (go-whatsapp-web-multidevice) REST server as an opt-in
# VEGA-managed sidecar process. Started/stopped from web/server.py lifespan.
# The whatsapp_* tools (pipeline/tools_whatsapp.py) talk to this server over REST.
# Dependencies: stdlib only (subprocess, urllib), pipeline.data_paths
# Test Status: tests/test_whatsapp_sidecar.py

from __future__ import annotations

import os
import subprocess
import urllib.request
from pathlib import Path

# Opt-in: sidecar only starts when this env flag is truthy. Off by default so the
# backend never spawns an unofficial-protocol WhatsApp server unless the user asks.
_OPT_IN_ENV = "VEGA_WHATSAPP_SIDECAR"
# Port must match tools_whatsapp default (VEGA_WHATSAPP_GOWA_URL) — keep in sync.
_DEFAULT_PORT = 3777

_proc: subprocess.Popen | None = None


def is_enabled() -> bool:
"""True if the GoWA sidecar is opted in via env (1/true/yes/on)."""
return os.getenv(_OPT_IN_ENV, "").strip().lower() in ("1", "true", "yes", "on")


def _binary_path() -> Path:
"""GoWA binary location. Override with VEGA_WHATSAPP_GOWA_BIN; else data_dir default."""
override = os.getenv("VEGA_WHATSAPP_GOWA_BIN")
if override:
return Path(override).expanduser()
try:
from pipeline.data_paths import data_dir
base = Path(data_dir())
except Exception:
base = Path(os.getenv("VEGA_DATA_DIR", ".vega"))
return base / "whatsapp-gowa" / "whatsapp"


def _port() -> int:
try:
return int(os.getenv("VEGA_WHATSAPP_GOWA_PORT", str(_DEFAULT_PORT)))
except ValueError:
return _DEFAULT_PORT


def _already_serving(port: int, timeout: float = 1.5) -> bool:
"""True if something already answers on the GoWA port (manual start / prior run)."""
try:
with urllib.request.urlopen(f"http://localhost:{port}/", timeout=timeout):
return True
except Exception:
return False


def start() -> dict:
"""Start the GoWA sidecar if opted in and not already running.

Returns a status dict — never raises (lifespan must not fail on sidecar issues).
"""
global _proc
if not is_enabled():
return {"started": False, "reason": "not opted in"}
port = _port()
if _already_serving(port):
return {"started": False, "reason": f"already serving on {port}"}
binary = _binary_path()
if not binary.is_file():
return {"started": False, "reason": f"binary not found: {binary}"}
if not os.access(binary, os.X_OK):
return {"started": False, "reason": f"binary not executable: {binary}"}
try:
# rest mode on the shared port; own process group so we can clean it up.
_proc = subprocess.Popen(
[str(binary), "rest", "--port", str(port)],
cwd=str(binary.parent),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return {"started": True, "pid": _proc.pid, "port": port}
except Exception as e:
_proc = None
return {"started": False, "reason": f"spawn failed: {e}"}


def stop() -> None:
"""Terminate the sidecar if VEGA started it (no-op for manually-run servers)."""
global _proc
if _proc is None:
return
try:
_proc.terminate()
try:
_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
_proc.kill()
except Exception:
pass
finally:
_proc = None
Loading
Loading