diff --git a/.env.example b/.env.example index 2182733..74d900f 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,15 @@ INKBOX_SIGNING_KEY=whsec_xxxxxxxxxxxx # INKBOX_ALLOWED_USERS=+15551234567,me@example.com # optional local allowlist # INKBOX_REQUIRE_SIGNATURE=true # INKBOX_BRIDGE_PORT=8767 +# INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS=true # skip per-call prompts for Inkbox MCP tools only + +# --- Realtime voice (optional; requires INKBOX_REALTIME_ENABLED=true) --- +# INKBOX_REALTIME_ENABLED=true +# INKBOX_REALTIME_API_KEY=sk-realtime +# OPENAI_API_KEY=sk-openai-fallback +# INKBOX_REALTIME_MODEL=gpt-realtime-2 +# INKBOX_REALTIME_VOICE=cedar +# INKBOX_REALTIME_FALLBACK_TO_INKBOX_STT_TTS=true # --- Codex --- CODEX_PROJECT_DIR=/path/to/the/repo/codex/should/work/in diff --git a/README.md b/README.md index 6f0e3bf..40722b4 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ inkbox-codex doctor inkbox-codex run ``` -`inkbox-codex setup` walks you through everything and writes `.env`: create a fresh Inkbox agent via self-signup (or bring an existing API key), pick or create the identity, attach the Codex avatar to the agent's contact card (auto for a new self-signup agent; offered for an existing one with no avatar), provision a phone number, wait for your `START` opt-in, optionally enable OpenAI Realtime voice (validating your key), connect iMessage, mint a webhook signing key, choose the project directory, and set up autostart. Rerun it anytime to reconfigure. Prefer to wire `.env` by hand? Copy `.env.example` to `.env` and fill in `INKBOX_API_KEY`, `INKBOX_IDENTITY`, `INKBOX_SIGNING_KEY`, and `CODEX_PROJECT_DIR` yourself. +`inkbox-codex setup` walks you through everything and writes `.env`: create a fresh Inkbox agent via self-signup (or bring an existing API key), pick or create the identity, attach the Codex avatar to the agent's contact card (auto for a new self-signup agent; offered for an existing one with no avatar), provision a phone number, wait for your `START` opt-in, optionally enable OpenAI Realtime voice (validating your key), connect iMessage, mint a webhook signing key, choose the project directory, choose whether to trust Inkbox MCP tools without repeated allow prompts, and set up autostart. Rerun it anytime to reconfigure. Prefer to wire `.env` by hand? Copy `.env.example` to `.env` and fill in `INKBOX_API_KEY`, `INKBOX_IDENTITY`, `INKBOX_SIGNING_KEY`, and `CODEX_PROJECT_DIR` yourself. On startup the bridge opens an Inkbox tunnel, wires mail/text/iMessage webhook subscriptions and the incoming-call channel to it, and routes everything into Codex sessions. @@ -140,7 +140,7 @@ Codex never silently runs anything destructive. The bridge starts `codex app-ser ## Sessions -Sessions are keyed by Inkbox contact, so one person = one conversation across channels. Codex session ids are persisted in `~/.inkbox-codex/sessions.json` and resumed across bridge restarts — your conversation picks up where it left off. Replies go out on the channel you last used (call replies fall back to SMS if you hang up before Codex finishes). +Sessions are keyed by Inkbox contact, so one person = one conversation across channels. Codex session ids are persisted in `~/.inkbox-codex/sessions.json` and resumed across bridge restarts — your conversation picks up where it left off. Replies go out on the channel you last used. If a voice call ends before Codex finishes a voice reply, that late voice reply is dropped instead of silently switching to SMS or email. **Typing indicator.** While Codex works on a turn, the bridge keeps a typing indicator alive on your iMessage thread (refreshed every few seconds, since it expires) so you can see it's busy. SMS, email, and voice have no typing indicator, so this is iMessage-only. @@ -192,13 +192,14 @@ Calls have two modes, chosen per call: | `CODEX_PROJECT_DIR` | yes | cwd | Directory Codex works in. | | `CODEX_MODEL` | no | CLI default | Model override for bridged sessions. | | `INKBOX_REQUIRE_SIGNATURE` | no | `true` | Refuse unsigned inbound webhooks unless `false`. | -| `INKBOX_BASE_URL` | no | `https://inkbox.ai` | Override the Inkbox API base URL. | +| `INKBOX_BASE_URL` | no | SDK default | Override the Inkbox API base URL. | | `INKBOX_PUBLIC_URL` | no | - | Public bridge URL. Omit to use an Inkbox tunnel. | | `INKBOX_TUNNEL_NAME` | no | identity handle | Tunnel name override. | | `INKBOX_ALLOWED_USERS` | no | - | Local allowlist (emails / E.164 numbers). Usually leave empty and use Inkbox contact rules. | | `INKBOX_ALLOW_ALL_USERS` | no | `false` | Allow all senders admitted by Inkbox contact rules. | | `INKBOX_BRIDGE_PORT` | no | `8767` | Local webhook server port. | | `INKBOX_PERMISSION_TIMEOUT_S` | no | `600` | Seconds to wait for a permission/poll reply. | +| `INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS` | no | `false` | Auto-accept Codex MCP prompts for Inkbox tools only. The setup wizard writes `true` when you trust the agent to send through Inkbox without per-call approval. | | `CODEX_BIN` | no | `codex` | Codex CLI executable to run. | | `CODEX_SANDBOX` | no | `workspace-write` | App-server thread sandbox (`read-only`, `workspace-write`, `danger-full-access`). | | `CODEX_APPROVAL_POLICY` | no | `on-request` | Codex approval policy for bridged turns. | @@ -219,7 +220,7 @@ The agent reaches you (or third parties) through an in-process MCP server: - `inkbox_list_text_conversations` · `inkbox_get_text_conversation` — browse SMS threads and history. - `inkbox_list_imessage_conversations` · `inkbox_get_imessage_conversation` — browse iMessage threads and history (find the `conversation_id` to send into). - `inkbox_lookup_contact` · `inkbox_list_contacts` · `inkbox_get_contact` — resolve and read address-book contacts (reverse-lookup by email/phone, free-text search, or full record by id). -- `inkbox_create_contact` · `inkbox_update_contact` · `inkbox_export_contact_vcard` — save, edit, and export contacts (vCard 4.0). Reads and writes are filtered server-side to what this identity may see. +- `inkbox_create_contact` · `inkbox_update_contact` · `inkbox_delete_contact` — save, edit, and remove contacts. Reads and writes are filtered server-side to what this identity may see. vCard export/import is not exposed. On a live call, the OpenAI Realtime voice agent additionally gets `consult_agent`, `register_post_call_action` / `edit_post_call_action` / `delete_post_call_action`, and `hang_up_call` — see [Voice](#voice). @@ -230,7 +231,7 @@ On a live call, the OpenAI Realtime voice agent additionally gets `consult_agent 3. Ask it to do something requiring a command (e.g. "run the tests") and verify you get a permission text; reply `1` and verify the result comes back. 4. Ask it something open-ended enough to trigger a poll; reply with a number. 5. Email the agent; verify the reply lands as an email on the same thread. -6. Call the number, ask what it's working on, hang up mid-answer, and verify the tail arrives as a text. +6. Call the number, ask what it's working on, hang up mid-answer, and verify the late voice tail is not silently sent as SMS or email. ## Development diff --git a/inkbox_codex/cli.py b/inkbox_codex/cli.py index f41ca8a..a3d6a65 100644 --- a/inkbox_codex/cli.py +++ b/inkbox_codex/cli.py @@ -7,12 +7,12 @@ try: from . import daemon - from .config import read_config + from .config import inkbox_client_kwargs, read_config from .doctor import print_doctor from .setup_wizard import interactive_setup except ImportError: # pragma: no cover - direct local import/test fallback import daemon - from config import read_config + from config import inkbox_client_kwargs, read_config from doctor import print_doctor from setup_wizard import interactive_setup @@ -24,7 +24,7 @@ def _cmd_whoami() -> int: return 1 from inkbox import Inkbox - identity = Inkbox(api_key=cfg.api_key, base_url=cfg.base_url).get_identity(cfg.identity) + identity = Inkbox(**inkbox_client_kwargs(cfg.api_key, cfg.base_url)).get_identity(cfg.identity) mailbox = getattr(identity, "mailbox", None) phone = getattr(identity, "phone_number", None) print(f"handle: {identity.agent_handle}") diff --git a/inkbox_codex/config.py b/inkbox_codex/config.py index 899f8ac..d718db5 100644 --- a/inkbox_codex/config.py +++ b/inkbox_codex/config.py @@ -13,7 +13,8 @@ RealtimeConfig, ) -INKBOX_BASE_URL_DEFAULT = "https://inkbox.ai" +# Empty means "do not override"; the Inkbox SDK owns its API default. +INKBOX_BASE_URL_DEFAULT = "" INKBOX_WS_PATH = "/phone/media/ws" DEFAULT_HOST = "0.0.0.0" DEFAULT_PORT = 8767 @@ -67,11 +68,21 @@ class BridgeConfig: codex_bin: str = "codex" codex_sandbox: str = "workspace-write" codex_approval_policy: str = "on-request" + auto_approve_inkbox_tools: bool = False permission_timeout_s: float = 600.0 # OpenAI Realtime voice (off unless the wizard validated a key) realtime: RealtimeConfig = field(default_factory=RealtimeConfig) +def inkbox_base_url_kwargs(base_url: str | None = None) -> Dict[str, str]: + normalized = str(base_url or "").strip() + return {"base_url": normalized} if normalized else {} + + +def inkbox_client_kwargs(api_key: str, base_url: str | None = None) -> Dict[str, str]: + return {"api_key": api_key, **inkbox_base_url_kwargs(base_url)} + + def _read_realtime_config() -> RealtimeConfig: """Build the Realtime voice config from the env. @@ -122,6 +133,7 @@ def read_config(extra: Dict[str, Any] | None = None) -> BridgeConfig: or extra.get("codex_approval_policy") or "on-request" ).strip(), + auto_approve_inkbox_tools=env_flag("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS", False), permission_timeout_s=float(os.getenv("INKBOX_PERMISSION_TIMEOUT_S") or 600.0), realtime=_read_realtime_config(), ) diff --git a/inkbox_codex/doctor.py b/inkbox_codex/doctor.py index b67f958..cbf574f 100644 --- a/inkbox_codex/doctor.py +++ b/inkbox_codex/doctor.py @@ -7,9 +7,9 @@ from typing import List, Tuple try: - from .config import read_config + from .config import inkbox_client_kwargs, read_config except ImportError: # pragma: no cover - direct local import/test fallback - from config import read_config + from config import inkbox_client_kwargs, read_config def run_doctor() -> List[Tuple[str, bool, str]]: @@ -68,7 +68,7 @@ def run_doctor() -> List[Tuple[str, bool, str]]: try: from inkbox import Inkbox - identity = Inkbox(api_key=cfg.api_key, base_url=cfg.base_url).get_identity(cfg.identity) + identity = Inkbox(**inkbox_client_kwargs(cfg.api_key, cfg.base_url)).get_identity(cfg.identity) mailbox = getattr(identity, "mailbox", None) phone = getattr(identity, "phone_number", None) detail = ", ".join(filter(None, [ diff --git a/inkbox_codex/gateway.py b/inkbox_codex/gateway.py index 0f04450..9e14a9e 100644 --- a/inkbox_codex/gateway.py +++ b/inkbox_codex/gateway.py @@ -6,8 +6,9 @@ 1. On startup, bring up the identity's Inkbox tunnel (or use ``INKBOX_PUBLIC_URL``), reconcile webhook subscriptions for the identity's mailbox (``message.received``), phone number - (``text.received``), and — when iMessage-enabled — the identity - itself (``imessage.received``), and patch the phone number's + (``text.received``), and - when iMessage-enabled - the identity + itself (``imessage.received`` and ``imessage.reaction_received``), + and patch the phone number's incoming-call channel to auto-accept onto our call WebSocket. 2. Serve ``POST /webhook`` (HMAC-verified) and ``WS /phone/media/ws``. 3. Map every inbound event to a contact-keyed Codex session: @@ -26,7 +27,7 @@ import threading import time from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple try: from aiohttp import WSMsgType, web @@ -53,9 +54,15 @@ INKBOX_TUNNEL_AVAILABLE = False try: - from .config import DEFAULT_WEBHOOK_PATH, INKBOX_WS_PATH, BridgeConfig, call_contexts_dir + from .config import ( + DEFAULT_WEBHOOK_PATH, + INKBOX_WS_PATH, + BridgeConfig, + call_contexts_dir, + inkbox_client_kwargs, + ) from .media import download_media, inbound_media_note - from .prompts import strip_markdown + from .prompts import contact_marker, strip_markdown from .realtime import ( RealtimeBridgeConnectError, RealtimeCallMeta, @@ -64,9 +71,9 @@ from .sessions import SessionManager from .tools import build_inkbox_mcp_server_config except ImportError: # pragma: no cover - direct local import/test fallback - from config import DEFAULT_WEBHOOK_PATH, INKBOX_WS_PATH, BridgeConfig, call_contexts_dir + from config import DEFAULT_WEBHOOK_PATH, INKBOX_WS_PATH, BridgeConfig, call_contexts_dir, inkbox_client_kwargs from media import download_media, inbound_media_note - from prompts import strip_markdown + from prompts import contact_marker, strip_markdown from realtime import ( RealtimeBridgeConnectError, RealtimeCallMeta, @@ -84,7 +91,18 @@ def _format_transcript(transcript: Any, limit: int = 30) -> str: return "\n".join(f" {role}: {text}" for role, text in rows) -def _post_call_prompt(actions: List[Dict[str, str]], transcript: Any) -> str: +def _format_realtime_consult_results(results: Any) -> str: + lines = [] + for index, result in enumerate(list(results or []), start=1): + request = getattr(result, "request", "") or "" + answer = getattr(result, "result", "") or "" + lines.append(f"{index}. Request: {request}\nResult: {answer}") + return "\n\n".join(lines) + + +def _post_call_prompt( + actions: List[Dict[str, str]], transcript: Any, consult_results: Any = None +) -> str: """Build the Codex prompt that executes queued after-call work.""" action_lines = "\n".join( f" {i}. {a.get('action', '')}" @@ -92,6 +110,7 @@ def _post_call_prompt(actions: List[Dict[str, str]], transcript: Any) -> str: for i, a in enumerate(actions or [], start=1) ) convo = _format_transcript(transcript) + consults = _format_realtime_consult_results(consult_results) parts = [ "[voice call ended] You were just on a phone call with your operator and " "agreed to do this work after the call. Do the actions that are still needed:", @@ -103,6 +122,13 @@ def _post_call_prompt(actions: List[Dict[str, str]], transcript: Any) -> str: ] if convo: parts += ["", "Recent call transcript:", convo] + if consults: + parts += [ + "", + "Realtime consults already completed during this call:", + consults, + "Do not repeat work that was already completed or queued unless the caller explicitly asked for another, repeat, or different action.", + ] return "\n".join(parts) @@ -139,18 +165,85 @@ def _call_ended_prompt(transcript: Any) -> str: parts = [ "[voice call ended] Your phone call with the operator just ended. If you " "committed to anything during it (open a PR, run a task, send a summary), " - "do that now with your tools. If there's nothing to do, do nothing.", + "do that now with your tools. First reconcile against the transcript: do " + "not redo work that was already completed, queued, canceled, or superseded " + "during the call. If there's nothing still needed, do nothing.", ] if convo: parts += ["", "Recent call transcript:", convo] return "\n".join(parts) +def _voice_consult_prompt( + *, + query: str, + transcript: Any, + outbound: Optional[Dict[str, Any]], + contact: Optional[Dict[str, Any]], + direction: str, + post_call_actions: Optional[List[Dict[str, str]]] = None, + consult_results: Any = None, +) -> str: + """Wrap a realtime consult so Codex stays grounded in the live call.""" + parts = [ + "Voice call consult from the Inkbox Realtime agent.", + "Answer only the current live-call request. Do not continue unrelated prior text/session work.", + "Do not run commands, run tests, edit files, or inspect git unless the consult request explicitly asks for project/coding work.", + "If the request is ordinary conversation, buying advice, brainstorming, or call-topic discussion, answer directly and briefly.", + f"Call direction: {direction or 'unknown'}.", + ] + outbound = outbound or {} + if outbound.get("purpose"): + parts.append(f"Outbound call purpose: {outbound['purpose']}") + if outbound.get("context"): + parts.append(f"Outbound call context: {outbound['context']}") + contact = contact or {} + if contact.get("name"): + parts.append(f"Caller/contact: {contact['name']}") + + if post_call_actions: + parts.append("Pending after-call actions already queued by the realtime call agent:") + for index, action in enumerate(post_call_actions, start=1): + details = f" - {action.get('details')}" if action.get("details") else "" + parts.append(f"{index}. {action.get('action', '')}{details}") + + prior_consults = _format_realtime_consult_results(consult_results) + if prior_consults: + parts += [ + "", + "Previous Codex consult results during this same live call:", + prior_consults, + "Do not repeat work that was already completed or queued unless the caller explicitly asked for another, repeat, or different action.", + ] + + recent = _format_transcript(transcript, limit=8) + if recent: + parts += ["", "Recent live-call transcript:", recent] + parts += [ + "", + f"Consult request: {query.strip()}", + "Return a concise spoken-friendly answer for the realtime agent to say on this call.", + ] + return "\n".join(parts) + + WEBHOOK_DEDUP_TTL_SECONDS = 300 +CONTACT_CACHE_TTL_SECONDS = 300 SMS_MAX_LENGTH = 1600 # Inkbox SMS hard cap +IMESSAGE_MAX_LENGTH = 18995 # Sendblue-compatible iMessage text cap # Inbound SMS carrier keywords handled entirely by the Inkbox server; # never wake the agent for them. SMS_CONTROL_WORDS = {"stop", "start", "help", "unstop", "unsubscribe", "cancel", "end", "quit"} +TEXT_EVENTS = ["text.received"] +IMESSAGE_EVENTS = ["imessage.received", "imessage.reaction_received"] + + +def _message_too_long_reason(channel: str, content: str, max_chars: int) -> str: + char_count = len(content or "") + return ( + f"{channel} text is {char_count} characters; maximum is {max_chars}. " + f"Shorten it or split it into smaller {channel} messages." + ) def _codex_health() -> str: @@ -189,8 +282,12 @@ def __init__(self, cfg: BridgeConfig): self._self_addresses: set[str] = set() self._recent_request_ids: Dict[str, float] = {} + self._inflight_request_ids: Dict[str, float] = {} self._active_call_ws: Dict[str, Any] = {} self._call_meta_by_id: Dict[str, Dict[str, Any]] = {} + # ((kind, value) -> (contact summary, expires_at)); mirrors Hermes' + # per-inbound lookup cache for repeated remote phone/email events. + self._contact_cache: Dict[Tuple[str, str], Tuple[Optional[Dict[str, Any]], float]] = {} # Failed outbound message ids we've already told the agent about, so a # webhook retry (or a second failure event for the same message) doesn't # re-notify and spin the agent in a loop. @@ -213,7 +310,7 @@ async def run(self) -> None: if not self.cfg.api_key or not self.cfg.identity: raise RuntimeError("INKBOX_API_KEY and INKBOX_IDENTITY must be set (see README)") - self._inkbox = Inkbox(api_key=self.cfg.api_key, base_url=self.cfg.base_url) + self._inkbox = Inkbox(**inkbox_client_kwargs(self.cfg.api_key, self.cfg.base_url)) self._identity = await asyncio.to_thread(self._inkbox.get_identity, self.cfg.identity) mailbox = getattr(self._identity, "mailbox", None) @@ -321,7 +418,7 @@ def _reconcile(owner_kw: Dict[str, Any], event_types: List[str]) -> None: _reconcile({"mailbox_id": identity.mailbox.id}, ["message.received"]) logger.info("[bridge] mailbox %s → %s", identity.mailbox.email_address, webhook_url) if identity.phone_number is not None: - _reconcile({"phone_number_id": identity.phone_number.id}, ["text.received"]) + _reconcile({"phone_number_id": identity.phone_number.id}, TEXT_EVENTS) # auto_accept: Inkbox answers and opens the call WS directly. self._inkbox.phone_numbers.update( identity.phone_number.id, @@ -331,7 +428,7 @@ def _reconcile(owner_kw: Dict[str, Any], event_types: List[str]) -> None: ) logger.info("[bridge] phone %s → %s + %s", identity.phone_number.number, webhook_url, ws_url) if getattr(identity, "imessage_enabled", False): - _reconcile({"agent_identity_id": identity.id}, ["imessage.received"]) + _reconcile({"agent_identity_id": identity.id}, IMESSAGE_EVENTS) logger.info("[bridge] iMessage for %s → %s", self.cfg.identity, webhook_url) async def _cleanup(self) -> None: @@ -352,16 +449,43 @@ async def _cleanup(self) -> None: async def _handle_health(self, request: "web.Request") -> "web.Response": return web.json_response({"ok": True, "identity": self.cfg.identity}) - def _is_duplicate(self, request_id: str) -> bool: + def _prune_dedup_ids(self) -> None: now = time.time() - # Opportunistic TTL sweep keeps the dict bounded. - for key, seen_at in list(self._recent_request_ids.items()): - if now - seen_at > WEBHOOK_DEDUP_TTL_SECONDS: + for store in (self._recent_request_ids, self._inflight_request_ids): + for key, seen_at in list(store.items()): + if now - seen_at > WEBHOOK_DEDUP_TTL_SECONDS: + store.pop(key, None) + if len(self._recent_request_ids) > 2000: + oldest = sorted(self._recent_request_ids.items(), key=lambda item: item[1]) + for key, _seen_at in oldest[: len(self._recent_request_ids) - 2000]: self._recent_request_ids.pop(key, None) + + def _dedup_begin(self, request_id: str) -> bool: + if not request_id: + return False + self._prune_dedup_ids() if request_id and request_id in self._recent_request_ids: return True + if request_id and request_id in self._inflight_request_ids: + return True + self._inflight_request_ids[request_id] = time.time() + return False + + def _dedup_commit(self, request_id: str) -> None: + if not request_id: + return + self._prune_dedup_ids() + self._inflight_request_ids.pop(request_id, None) + self._recent_request_ids[request_id] = time.time() + + def _dedup_rollback(self, request_id: str) -> None: if request_id: - self._recent_request_ids[request_id] = now + self._inflight_request_ids.pop(request_id, None) + + def _is_duplicate(self, request_id: str) -> bool: + if self._dedup_begin(request_id): + return True + self._dedup_commit(request_id) return False def _sender_allowed(self, *candidates: str) -> bool: @@ -382,48 +506,292 @@ async def _handle_webhook(self, request: "web.Request") -> "web.Response": if not ok: return web.Response(status=401, text="invalid signature") - if self._is_duplicate(request.headers.get("X-Inkbox-Request-Id", "")): + request_id = request.headers.get("X-Inkbox-Request-Id", "") + if self._dedup_begin(request_id): return web.json_response({"ok": True, "deduped": True}) try: envelope = json.loads(body) except json.JSONDecodeError: + self._dedup_rollback(request_id) return web.Response(status=400, text="invalid json") - event_type = str(envelope.get("event_type") or "") - if not event_type and envelope.get("direction") == "inbound" and envelope.get("local_phone_number"): - # Incoming-call payloads are flat (no envelope); with - # auto_accept this is informational — the WS is the channel. - return web.json_response({"ok": True}) - - if event_type == "message.received": - return await self._on_mail_received(envelope) - if event_type == "text.received": - return await self._on_text_received(envelope) - if event_type == "imessage.received": - return await self._on_imessage_received(envelope) - # Outbound delivery failures: tell the agent its message didn't land so - # it can retry or reach the human another way. - if event_type in ("text.delivery_failed", "text.delivery_unconfirmed"): - return await self._on_text_delivery_failed(envelope, event_type) - if event_type == "imessage.delivery_failed": - return await self._on_imessage_delivery_failed(envelope) - if event_type in ("message.bounced", "message.failed"): - return await self._on_mail_delivery_failed(envelope, event_type) - # Other delivery lifecycle (text.sent/delivered, imessage.sent/...) is - # logged without waking the agent, matching the hermes plugin. - logger.debug("[bridge] lifecycle event %s", event_type) - return web.json_response({"ok": True, "ignored": event_type}) + try: + event_type = str(envelope.get("event_type") or "") + if not event_type and ( + self._call_context_id(envelope) + or (envelope.get("direction") == "inbound" and envelope.get("local_phone_number")) + ): + # Incoming-call payloads are flat (no envelope); with + # auto_accept this is informational, but it can carry resolved + # contact context before the WS starts. + call_id = self._call_context_id(envelope) + if call_id: + self._call_meta_by_id[call_id] = envelope + if len(self._call_meta_by_id) > 100: + self._call_meta_by_id.pop(next(iter(self._call_meta_by_id)), None) + response = web.json_response({"ok": True}) + elif event_type == "message.received": + response = await self._on_mail_received(envelope) + elif event_type == "text.received": + response = await self._on_text_received(envelope) + elif event_type == "imessage.received": + response = await self._on_imessage_received(envelope) + elif event_type == "imessage.reaction_received": + response = await self._on_imessage_reaction_received(envelope) + # Outbound delivery failures: tell the agent its message didn't land so + # it can retry or reach the human another way. + elif event_type in ("text.delivery_failed", "text.delivery_unconfirmed"): + response = await self._on_text_delivery_failed(envelope, event_type) + elif event_type == "imessage.delivery_failed": + response = await self._on_imessage_delivery_failed(envelope) + elif event_type in ("message.bounced", "message.failed"): + response = await self._on_mail_delivery_failed(envelope, event_type) + else: + # Other delivery lifecycle (text.sent/delivered, imessage.sent/...) is + # logged without waking the agent, matching the hermes plugin. + logger.debug("[bridge] lifecycle event %s", event_type) + response = web.json_response({"ok": True, "ignored": event_type}) + except Exception: + self._dedup_rollback(request_id) + raise + self._dedup_commit(request_id) + return response @staticmethod - def _chat_key(data: Dict[str, Any], fallback: str) -> str: + def _thread_key(prefix: str, value: Any) -> Optional[str]: + raw = str(value or "").strip() + return f"{prefix}:{raw}" if raw else None + + @staticmethod + def _chat_key( + data: Dict[str, Any], + fallback: str, + thread_key: Optional[str] = None, + contact: Optional[Dict[str, Any]] = None, + *, + allow_webhook_contact: bool = True, + ) -> str: # Webhook payloads carry resolved contacts — key the session by - # contact id so email/SMS/iMessage/voice converge on one session. - contacts = data.get("contacts") or [] - if len(contacts) == 1 and contacts[0].get("id"): - return str(contacts[0]["id"]) + # contact id so email/SMS/iMessage/voice converge on one session. If + # Inkbox cannot resolve a contact, keep channel conversations stable + # before falling back to the raw address/number. + if contact and contact.get("id"): + return str(contact["id"]) + if allow_webhook_contact: + contacts = data.get("contacts") or [] + if len(contacts) == 1: + contact_id = ( + contacts[0].get("id") + or contacts[0].get("contact_id") + or contacts[0].get("contactId") + ) + if contact_id: + return str(contact_id) + if thread_key: + return thread_key return fallback + @staticmethod + def _field(obj: Any, *names: str) -> Any: + """Read a field from either an SDK object or webhook dict.""" + if obj is None: + return None + for name in names: + if isinstance(obj, dict): + value = obj.get(name) + else: + value = getattr(obj, name, None) + if value not in (None, ""): + return value + return None + + @classmethod + def _webhook_list(cls, obj: Any, *names: str) -> List[Any]: + if obj is None: + return [] + for name in names: + value = obj.get(name) if isinstance(obj, dict) else getattr(obj, name, None) + if isinstance(value, (list, tuple)): + return list(value) + return [] + + @classmethod + def _string_list_field(cls, obj: Any, *names: str) -> List[str]: + values = cls._webhook_list(obj, *names) + return [str(value).strip() for value in values if str(value).strip()] + + @classmethod + def _conversation_summary_is_group(cls, summary: Any) -> bool: + return bool(cls._field(summary, "isGroup", "is_group", "is_group_conversation")) + + @classmethod + def _call_context_id(cls, call_context: Dict[str, Any]) -> str: + return str(cls._field(call_context, "id", "call_id", "callId") or "").strip() + + @classmethod + def _merge_call_context( + cls, primary: Dict[str, Any], fallback: Optional[Dict[str, Any]] + ) -> Dict[str, Any]: + merged = dict(fallback or {}) + for key, value in (primary or {}).items(): + if value not in (None, "", [], {}): + merged[key] = value + return merged + + @classmethod + def _contact_values(cls, entries: Any) -> List[str]: + if not entries: + return [] + if isinstance(entries, str): + rows = [entries] + elif isinstance(entries, (list, tuple)): + rows = list(entries) + else: + rows = [entries] + rows.sort( + key=lambda item: not bool(cls._field(item, "is_primary", "isPrimary")), + ) + values: List[str] = [] + for item in rows: + value = item if isinstance(item, str) else cls._field(item, "value", "address", "email", "phone") + if value: + values.append(str(value)) + return values + + @classmethod + def _contact_summary(cls, contact: Any) -> Optional[Dict[str, Any]]: + if not contact: + return None + given = cls._field(contact, "given_name", "givenName") + family = cls._field(contact, "family_name", "familyName") + full_name = " ".join(str(part) for part in (given, family) if part).strip() + name = ( + cls._field(contact, "preferred_name", "preferredName") + or cls._field(contact, "name", "display_name", "displayName") + or full_name + or None + ) + summary = { + "id": str(cls._field(contact, "id", "contact_id", "contactId") or ""), + "name": str(name) if name else None, + "emails": cls._contact_values( + cls._field( + contact, + "emails", + "email_addresses", + "emailAddresses", + "email", + "email_address", + "emailAddress", + ) + ), + "phones": cls._contact_values( + cls._field( + contact, + "phones", + "phone_numbers", + "phoneNumbers", + "phone", + "phone_number", + "phoneNumber", + ) + ), + "company": cls._field(contact, "company_name", "companyName", "company"), + "job_title": cls._field(contact, "job_title", "jobTitle", "title"), + "notes": ((str(cls._field(contact, "notes") or "")[:200]).strip() or None), + } + if any(summary.get(key) for key in ("id", "name", "emails", "phones")): + return summary + return None + + async def _hydrate_contact(self, contact: Any) -> Optional[Dict[str, Any]]: + summary = self._contact_summary(contact) + contact_id = (summary or {}).get("id") + if not contact_id or self._inkbox is None: + return summary + try: + return self._contact_summary(await asyncio.to_thread(self._inkbox.contacts.get, contact_id)) or summary + except Exception: + return summary + + async def _resolve_contact_full( + self, *, kind: str, value: str + ) -> Optional[Dict[str, Any]]: + if not value: + return None + cache_key = (kind, value.lower()) + now = time.time() + cached = self._contact_cache.get(cache_key) + if cached and cached[1] > now: + return cached[0] + + if self._inkbox is None: + return None + try: + matches = await asyncio.to_thread(self._inkbox.contacts.lookup, **{kind: value}) + except Exception: + logger.debug("[bridge] contacts.lookup(%s=%s) failed", kind, value, exc_info=True) + self._contact_cache[cache_key] = (None, now + CONTACT_CACHE_TTL_SECONDS) + return None + if len(matches) != 1: + self._contact_cache[cache_key] = (None, now + CONTACT_CACHE_TTL_SECONDS) + return None + contact = self._contact_summary(matches[0]) + self._contact_cache[cache_key] = (contact, now + CONTACT_CACHE_TTL_SECONDS) + return contact + + async def _resolve_call_contact( + self, call_context: Dict[str, Any], remote: str + ) -> Optional[Dict[str, Any]]: + """Resolve the call's remote party before Realtime greets.""" + direct = ( + call_context.get("contact") + or call_context.get("remote_contact") + or call_context.get("remoteContact") + ) + if direct: + return await self._hydrate_contact(direct) + + contact_id = self._field( + call_context, "contact_id", "contactId", "remote_contact_id", "remoteContactId" + ) + if contact_id: + return await self._hydrate_contact({ + "id": contact_id, + "name": self._field( + call_context, "contact_name", "contactName", "remote_name", "remoteName" + ), + }) + + contacts = ( + call_context.get("contacts") + or call_context.get("contact_list") + or call_context.get("contactList") + or [] + ) + if isinstance(contacts, dict): + contacts = [contacts] + if len(contacts) == 1: + return await self._hydrate_contact(contacts[0]) + for entry in contacts: + bucket = str(self._field(entry, "bucket", "role", "type") or "").lower() + if bucket in {"from", "remote", "caller", "callee", "to"} and self._field( + entry, "id", "contact_id", "contactId" + ): + return await self._hydrate_contact(entry) + + if not remote or self._inkbox is None: + return None + try: + matches = await asyncio.to_thread(self._inkbox.contacts.lookup, phone=remote) + except Exception: + logger.debug("[bridge] contacts.lookup(phone=%s) failed for call", remote, exc_info=True) + return None + if len(matches) != 1: + return None + return self._contact_summary(matches[0]) + async def _on_mail_received(self, envelope: Dict[str, Any]) -> "web.Response": data = envelope.get("data") or {} message = data.get("message") or {} @@ -438,12 +806,21 @@ async def _on_mail_received(self, envelope: Dict[str, Any]) -> "web.Response": if message.get("has_attachments"): saved = await self._fetch_mail_attachments(message) body_text = (body_text + inbound_media_note(saved)).strip() - chat_id = self._chat_key(data, sender) + thread_key = self._thread_key("email", message.get("thread_id")) + contact = await self._resolve_contact_full(kind="email", value=sender) + chat_id = self._chat_key( + data, + sender, + thread_key, + contact=contact, + allow_webhook_contact=False, + ) meta = { "to": sender, "sender": sender, "subject": subject, "thread_id": message.get("thread_id"), + "contact": contact, } # The channel tag (Subject included) is added by frame_inbound. await self.sessions.get(chat_id).handle_inbound(body_text, "email", meta) @@ -504,7 +881,113 @@ def _fetch_mail_body(self, message: Dict[str, Any]) -> str: logger.debug("[bridge] full-body fetch failed; using snippet", exc_info=True) return str(message.get("snippet") or "") + async def _lookup_text_conversation_summary(self, conversation_id: str) -> Any: + if not conversation_id: + return None + + def _lookup() -> Any: + identity = self._identity + if identity is None and self._inkbox is not None: + identity = self._inkbox.get_identity(self.cfg.identity) + if identity is None: + return None + method = getattr(identity, "list_text_conversations", None) + if callable(method): + try: + conversations = method(limit=200, offset=0, include_groups=True) + except TypeError: + conversations = method({"limit": 200, "offset": 0, "includeGroups": True}) + else: + method = getattr(identity, "listTextConversations", None) + if not callable(method): + return None + conversations = method({"limit": 200, "offset": 0, "includeGroups": True}) + for entry in conversations or []: + if str(self._field(entry, "id", "conversation_id", "conversationId") or "") == conversation_id: + return entry + return None + + try: + return await asyncio.to_thread(_lookup) + except Exception: + logger.debug( + "[bridge] text conversation summary lookup failed for %s", + conversation_id, + exc_info=True, + ) + return None + + @classmethod + def _group_sms_prompt( + cls, + body: str, + *, + sender: str, + conversation_id: str, + local_phone: str, + participants: List[str], + contact: Optional[Dict[str, Any]] = None, + ) -> str: + marker_parts = [ + f"[inkbox:group_sms conversation_id={conversation_id or 'unknown'}", + f"from={sender}", + f"local={local_phone}" if local_phone else None, + f"participants={','.join(participants)}" if participants else None, + "reply_mode=conversation_id", + f"| {contact_marker(contact)}]", + ] + marker = " ".join(part for part in marker_parts if part) + policy = "\n".join([ + "Group SMS response policy: you receive every message in this group so you can track context.", + "Reply only when the latest message clearly addresses this Inkbox agent, asks it to act, or a visible answer would be expected from the agent.", + "Treat ordinary group chatter as context only.", + "If no visible reply is warranted, return exactly [SILENT].", + ]) + return "\n".join(part for part in [marker, policy, body] if part) + + @classmethod + def _imessage_reaction_prompt( + cls, + *, + sender: str, + conversation_id: str, + target_message_id: str, + reaction_label: str, + contact: Optional[Dict[str, Any]] = None, + ) -> str: + conversation_part = f" conversation_id={conversation_id}" if conversation_id else "" + target_part = f" target_message_id={target_message_id}" if target_message_id else "" + marker = ( + f"[inkbox:imessage_reaction from={sender} reaction={reaction_label}" + f"{conversation_part}{target_part} | {contact_marker(contact)}]" + ) + policy = "\n".join([ + f"{sender} reacted with a '{reaction_label}' tapback to your message.", + "A reaction is a lightweight signal, not always a request for a reply.", + "Reply only when the reaction plausibly warrants one - e.g. a 'question' " + "tapback usually asks for clarification or a follow-up, 'emphasize' may " + "invite one, while 'love'/'like'/'laugh'/'dislike' are usually just " + "acknowledgements that need no response.", + "If no visible reply is warranted, return exactly [SILENT].", + ]) + return f"{marker}\n{policy}" + async def _on_text_received(self, envelope: Dict[str, Any]) -> "web.Response": + data = envelope.get("data") or {} + message = data.get("text_message") or {} + message_id = str(message.get("id") or "").strip() + event_key = f"text:{message_id}" if message_id else "" + if self._dedup_begin(event_key): + return web.json_response({"ok": True, "deduped": True}) + try: + response = await self._on_text_received_once(envelope) + except Exception: + self._dedup_rollback(event_key) + raise + self._dedup_commit(event_key) + return response + + async def _on_text_received_once(self, envelope: Dict[str, Any]) -> "web.Response": data = envelope.get("data") or {} message = data.get("text_message") or {} if message.get("direction") == "outbound": @@ -524,16 +1007,78 @@ async def _on_text_received(self, envelope: Dict[str, Any]) -> "web.Response": return web.json_response({"ok": True, "ignored": "sender-not-allowed"}) body = await self._with_media(text, media, prefix=f"sms-{message.get('id', '')}") - chat_id = self._chat_key(data, sender) + conversation_id = str( + message.get("conversation_id") or message.get("conversationId") or "" + ).strip() + local_phone = str( + message.get("local_phone_number") or message.get("localPhoneNumber") or "" + ).strip() + conversation_summary = await self._lookup_text_conversation_summary(conversation_id) + participants: List[str] = [] + for entry in ( + self._string_list_field(conversation_summary, "participants") + + self._string_list_field(message, "participants") + ): + if entry not in participants: + participants.append(entry) + contacts = self._webhook_list(data, "contacts", "contact_list") + agent_identities = self._webhook_list( + data, + "agent_identities", + "agentIdentities", + "identity_agents", + ) + is_group = ( + self._conversation_summary_is_group(conversation_summary) + or bool(self._field(message, "isGroup", "is_group")) + or len(participants) > 1 + or len(contacts) > 1 + or len(agent_identities) > 1 + ) + contact = await self._resolve_contact_full(kind="phone", value=sender) + if is_group: + body = self._group_sms_prompt( + body, + sender=sender, + conversation_id=conversation_id, + local_phone=local_phone, + participants=participants, + contact=contact, + ) + thread_key = self._thread_key("sms", conversation_id) + chat_id = self._chat_key( + data, + sender, + thread_key, + contact=contact, + allow_webhook_contact=False, + ) meta = { - "conversation_id": message.get("conversation_id"), + "conversation_id": conversation_id or None, "to": sender, "sender": sender, + "conversation_kind": "group" if is_group else "direct", + "contact": contact, } await self.sessions.get(chat_id).handle_inbound(body, "sms", meta) return web.json_response({"ok": True}) async def _on_imessage_received(self, envelope: Dict[str, Any]) -> "web.Response": + data = envelope.get("data") or {} + message = data.get("message") or {} + message_id = str(message.get("id") or "").strip() + event_key = f"imessage:{message_id}" if message_id else "" + if self._dedup_begin(event_key): + return web.json_response({"ok": True, "deduped": True}) + try: + response = await self._on_imessage_received_once(envelope) + except Exception: + self._dedup_rollback(event_key) + raise + self._dedup_commit(event_key) + return response + + async def _on_imessage_received_once(self, envelope: Dict[str, Any]) -> "web.Response": data = envelope.get("data") or {} message = data.get("message") or {} if not message or message.get("direction") == "outbound": @@ -547,11 +1092,78 @@ async def _on_imessage_received(self, envelope: Dict[str, Any]) -> "web.Response return web.json_response({"ok": True, "ignored": "sender-not-allowed"}) body = await self._with_media(text, media, prefix=f"imsg-{message.get('id', '')}") - chat_id = self._chat_key(data, sender) - meta = {"conversation_id": message.get("conversation_id"), "sender": sender} + conversation_id = str(message.get("conversation_id") or "").strip() + contact = await self._resolve_contact_full(kind="phone", value=sender) + chat_id = self._chat_key( + data, + sender, + self._thread_key("imessage", conversation_id), + contact=contact, + allow_webhook_contact=False, + ) + meta = {"conversation_id": conversation_id or None, "sender": sender, "contact": contact} await self.sessions.get(chat_id).handle_inbound(body, "imessage", meta) return web.json_response({"ok": True}) + async def _on_imessage_reaction_received(self, envelope: Dict[str, Any]) -> "web.Response": + data = envelope.get("data") or {} + reaction = data.get("reaction") or {} + reaction_id = str(reaction.get("id") or "").strip() + event_key = f"imessage_reaction:{reaction_id}" if reaction_id else "" + if self._dedup_begin(event_key): + return web.json_response({"ok": True, "deduped": True}) + try: + direction = str(reaction.get("direction") or "").strip().lower() + if direction and direction != "inbound": + response = web.json_response({"ok": True, "ignored": "outbound-reaction"}) + else: + sender = str(reaction.get("remote_number") or "").strip() + if not sender: + response = web.json_response({"ok": True, "ignored": "empty"}) + elif not self._sender_allowed(sender): + response = web.json_response({"ok": True, "ignored": "sender-not-allowed"}) + else: + conversation_id = str(reaction.get("conversation_id") or "").strip() + target_message_id = str(reaction.get("target_message_id") or "").strip() + reaction_type = str(reaction.get("reaction") or "").strip().lower() + custom_emoji = str(reaction.get("custom_emoji") or "").strip() + reaction_label = ( + f"{reaction_type}:{custom_emoji}" + if reaction_type == "custom" and custom_emoji + else reaction_type + ) or "unknown" + contact = await self._resolve_contact_full(kind="phone", value=sender) + body = self._imessage_reaction_prompt( + sender=sender, + conversation_id=conversation_id, + target_message_id=target_message_id, + reaction_label=reaction_label, + contact=contact, + ) + chat_id = self._chat_key( + data, + sender, + self._thread_key("imessage", conversation_id), + contact=contact, + allow_webhook_contact=False, + ) + meta = { + "conversation_id": conversation_id or None, + "sender": sender, + "message_id": reaction_id or target_message_id, + "reply_to_id": target_message_id or reaction_id, + "reaction": reaction_label, + "typing": reaction_label == "question", + "contact": contact, + } + await self.sessions.get(chat_id).handle_inbound(body, "imessage", meta) + response = web.json_response({"ok": True}) + except Exception: + self._dedup_rollback(event_key) + raise + self._dedup_commit(event_key) + return response + async def _with_media(self, text: str, media: List[Dict[str, Any]], *, prefix: str) -> str: """Download inbound media and append a note pointing Codex at the files. @@ -631,7 +1243,8 @@ async def _on_text_delivery_failed(self, envelope: Dict[str, Any], event_type: s reason = str(message.get("error_detail") or message.get("error_code") or "").strip() if event_type == "text.delivery_unconfirmed" and not reason: reason = "carrier could not confirm delivery" - chat_id = self._chat_key(data, recipient) + conversation_id = str(message.get("conversation_id") or message.get("conversationId") or "").strip() + chat_id = self._chat_key(data, recipient, self._thread_key("sms", conversation_id)) logger.info("[bridge] SMS delivery failed to %s: %s", recipient, reason or event_type) return await self._notify_delivery_failure(chat_id, "SMS", recipient, body, reason or event_type) @@ -650,7 +1263,8 @@ async def _on_imessage_delivery_failed(self, envelope: Dict[str, Any]) -> "web.R or message.get("status") or "" ).strip() - chat_id = self._chat_key(data, recipient) + conversation_id = str(message.get("conversation_id") or message.get("conversationId") or "").strip() + chat_id = self._chat_key(data, recipient, self._thread_key("imessage", conversation_id)) logger.info("[bridge] iMessage delivery failed to %s: %s", recipient, reason) return await self._notify_delivery_failure(chat_id, "iMessage", recipient, body, reason) @@ -664,7 +1278,7 @@ async def _on_mail_delivery_failed(self, envelope: Dict[str, Any], event_type: s recipient = str(to_addresses[0] if to_addresses else "").strip() subject = str(message.get("subject") or "").strip() reason = "bounced" if event_type == "message.bounced" else "permanent send failure" - chat_id = self._chat_key(data, recipient) + chat_id = self._chat_key(data, recipient, self._thread_key("email", message.get("thread_id"))) logger.info("[bridge] email %s to %s (subject: %s)", reason, recipient, subject) body = f"(email, subject: {subject})" if subject else "" return await self._notify_delivery_failure(chat_id, "email", recipient, body, reason) @@ -674,7 +1288,12 @@ async def _on_mail_delivery_failed(self, envelope: Dict[str, Any], event_type: s # ------------------------------------------------------------------ async def _open_realtime_bridge( - self, remote: str, call_id: str, outbound: Optional[Dict[str, Any]] = None + self, + remote: str, + call_id: str, + outbound: Optional[Dict[str, Any]] = None, + contact: Optional[Dict[str, Any]] = None, + direction: str = "inbound", ) -> Any: """Preflight an OpenAI Realtime session for an incoming call. @@ -686,18 +1305,54 @@ async def _open_realtime_bridge( Any: An OpenedRealtimeBridge on success, or None if the connect failed (the caller then falls back to Inkbox STT/TTS). """ - phone = getattr(self._identity, "phone_number", None) + identity = self._identity + mailbox = getattr(identity, "mailbox", None) + phone = getattr(identity, "phone_number", None) oc = outbound or {} + contact = contact or {} meta = RealtimeCallMeta( call_id=call_id or "unknown", remote_phone_number=remote or None, - agent_identity_phone=getattr(phone, "number", None), + direction=direction or "inbound", + agent_identity_handle=( + getattr(identity, "agent_handle", None) + or getattr(identity, "handle", None) + or self.cfg.identity + or None + ), + agent_identity_email=( + getattr(mailbox, "email_address", None) + or getattr(identity, "email_address", None) + ), + agent_identity_phone=( + getattr(phone, "number", None) + if not isinstance(phone, str) + else phone + ), project_dir=self.cfg.project_dir, + contact_known=bool(contact.get("id")), + contact_id=contact.get("id"), + contact_name=contact.get("name"), + contact_emails=list(contact.get("emails") or []), + contact_phones=list(contact.get("phones") or []), + contact_company=contact.get("company"), + contact_job_title=contact.get("job_title"), + contact_notes=contact.get("notes"), outbound_purpose=(oc.get("purpose") or None), outbound_opening=(oc.get("opening_message") or None), outbound_context=(oc.get("context") or None), + outbound_reason=(oc.get("reason") or None), + outbound_scheduled_by=(oc.get("scheduled_by") or None), + outbound_conversation_summary=(oc.get("conversation_summary") or None), ) try: + logger.info( + "[bridge] opening realtime call call_id=%s direction=%s outbound_purpose=%s opening=%s", + meta.call_id, + meta.direction, + str(meta.outbound_purpose or "")[:120], + bool(meta.outbound_opening), + ) return await open_inkbox_realtime_bridge(config=self.cfg.realtime, meta=meta) except RealtimeBridgeConnectError as exc: logger.warning( @@ -716,10 +1371,18 @@ def _load_outbound_context(token: Optional[str]) -> Optional[Dict[str, Any]]: return None path = call_contexts_dir() / f"{token}.json" if not path.exists(): + logger.warning("[bridge] outbound call context token %s not found at %s", token, path) return None try: - return json.loads(path.read_text()) + data = json.loads(path.read_text()) + logger.info( + "[bridge] loaded outbound call context token=%s purpose=%s", + token, + str(data.get("purpose") or "")[:120], + ) + return data except (OSError, json.JSONDecodeError): + logger.warning("[bridge] failed to load outbound call context token=%s", token, exc_info=True) return None async def _handle_call_ws(self, request: "web.Request") -> Any: @@ -739,10 +1402,32 @@ async def _handle_call_ws(self, request: "web.Request") -> Any: call_context = json.loads(call_context_raw) if call_context_raw else {} except json.JSONDecodeError: call_context = {} - remote = str(call_context.get("remote_phone_number") or "").strip() - call_id = str(call_context.get("id") or call_context.get("call_id") or "") - chat_id = remote or f"call:{call_id}" + call_id = self._call_context_id(call_context) or str(request.query.get("call_id") or "").strip() + stored_call_context = self._call_meta_by_id.pop(call_id, None) if call_id else None + if stored_call_context: + call_context = self._merge_call_context(call_context, stored_call_context) + if call_id and not self._call_context_id(call_context): + call_context["id"] = call_id + call_id = self._call_context_id(call_context) or call_id outbound = self._load_outbound_context(request.query.get("context_token")) + remote = str( + self._field( + call_context, + "remote_phone_number", + "remotePhoneNumber", + "from_number", + "fromNumber", + "to_number", + "toNumber", + ) + or (outbound or {}).get("to_number") + or "" + ).strip() + direction = str( + self._field(call_context, "direction") or ("outbound" if outbound else "inbound") + ).strip().lower() or "inbound" + contact = await self._resolve_call_contact(call_context, remote) + chat_id = (contact or {}).get("id") or remote or f"call:{call_id}" ws = web.WebSocketResponse() @@ -752,7 +1437,7 @@ async def _handle_call_ws(self, request: "web.Request") -> Any: # via run_consult. If the preflight fails, fall through to Inkbox # STT/TTS below (unless fallback is disabled, then refuse the call). if self.cfg.realtime.enabled: - bridge = await self._open_realtime_bridge(remote, call_id, outbound) + bridge = await self._open_realtime_bridge(remote, call_id, outbound, contact, direction) if bridge is None and not self.cfg.realtime.fallback_to_inkbox_stt_tts: return web.Response(status=503, text="realtime bridge unavailable") if bridge is not None: @@ -764,18 +1449,39 @@ async def _handle_call_ws(self, request: "web.Request") -> Any: self._active_call_ws[chat_id] = ws logger.info("[bridge] realtime call connected: %s", chat_id or call_id) - async def _consult(query: str, _transcript: Any) -> str: + async def _consult( + _meta: RealtimeCallMeta, + query: str, + _transcript: Any, + post_call_actions: List[Dict[str, str]], + consult_results: Any, + ) -> str: # Route the model's request into the caller's shared session. - return await self.sessions.get(chat_id).run_consult(query) - - async def _post_call(actions: List[Dict[str, str]], transcript: Any) -> None: + logger.info("[bridge] realtime consult for %s: %s", chat_id, query) + prompt = _voice_consult_prompt( + query=query, + transcript=_transcript, + outbound=outbound, + contact=contact, + direction=direction, + post_call_actions=post_call_actions, + consult_results=consult_results, + ) + return await self.sessions.get(chat_id).run_consult(prompt) + + async def _post_call( + _meta: RealtimeCallMeta, + actions: List[Dict[str, str]], + transcript: Any, + consult_results: Any, + ) -> None: # Run the queued after-call work in the caller's session. The # text reply is discarded; side effects (emails, edits, PRs) # happen via Codex's tools during the turn. - prompt = _post_call_prompt(actions, transcript) + prompt = _post_call_prompt(actions, transcript, consult_results) await self.sessions.get(chat_id).run_consult(prompt) - async def _call_ended(transcript: Any) -> None: + async def _call_ended(_meta: RealtimeCallMeta, transcript: Any) -> None: # No queued actions: let Codex reflect and do any follow-up # it committed to on the call. Stays silent if nothing to do. prompt = _call_ended_prompt(transcript) @@ -807,6 +1513,7 @@ async def _call_ended(transcript: Any) -> None: await ws.prepare(request) self._active_call_ws[chat_id] = ws logger.info("[bridge] call connected: %s", chat_id or call_id) + transcript: List[Tuple[str, str]] = [] try: async for msg in ws: @@ -823,13 +1530,22 @@ async def _call_ended(transcript: Any) -> None: text = str(payload.get("text") or "").strip() if not text: continue - meta = {"call_id": call_id, "sender": remote} + transcript.append(("user", text)) + meta = { + "call_id": call_id, + "sender": remote, + "contact": contact, + "direction": direction, + } session = self.sessions.get(chat_id) await session.handle_inbound(text, "voice", meta) elif event == "stop": break finally: self._active_call_ws.pop(chat_id, None) + if transcript: + prompt = _call_ended_prompt(transcript) + await self.sessions.get(chat_id).run_consult(prompt) logger.info("[bridge] call ended: %s", chat_id or call_id) return ws @@ -917,34 +1633,51 @@ async def send_to_contact( None """ meta = meta or {} + if content.strip() == "[SILENT]": + logger.debug("[bridge] suppressing exact [SILENT] reply for %s", chat_id) + return if mode == "voice": ws = self._active_call_ws.get(chat_id) if ws is not None: await self._speak(ws, strip_markdown(content), str(meta.get("call_id") or "")) return - # Call ended while Codex was thinking — fall back to SMS so - # the answer isn't lost. - mode = "sms" if str(meta.get("to") or chat_id).startswith("+") else "email" - - identity = await asyncio.to_thread(self._inkbox.get_identity, self.cfg.identity) + logger.info( + "[bridge] dropped late voice reply after call ended: %s", + chat_id, + ) + return if mode == "sms": text = strip_markdown(content) if len(text) > SMS_MAX_LENGTH: - text = text[: SMS_MAX_LENGTH - 1] + "…" + raise ValueError(_message_too_long_reason("SMS", text, SMS_MAX_LENGTH)) + identity = await asyncio.to_thread(self._inkbox.get_identity, self.cfg.identity) kwargs: Dict[str, Any] = {"text": text} - if meta.get("conversation_id"): - kwargs["conversation_id"] = str(meta["conversation_id"]) + conversation_id = str(meta.get("conversation_id") or "").strip() + if not conversation_id and str(chat_id).startswith("sms:"): + conversation_id = str(chat_id).split(":", 1)[1] + if conversation_id: + kwargs["conversation_id"] = conversation_id else: kwargs["to"] = str(meta.get("to") or chat_id) await asyncio.to_thread(identity.send_text, **kwargs) elif mode == "imessage": + text = strip_markdown(content) + if len(text) > IMESSAGE_MAX_LENGTH: + raise ValueError(_message_too_long_reason("iMessage", text, IMESSAGE_MAX_LENGTH)) + identity = await asyncio.to_thread(self._inkbox.get_identity, self.cfg.identity) + conversation_id = str(meta.get("conversation_id") or "").strip() + if not conversation_id and str(chat_id).startswith("imessage:"): + conversation_id = str(chat_id).split(":", 1)[1] + if not conversation_id: + raise ValueError(f"No iMessage conversation id for chat {chat_id}") await asyncio.to_thread( identity.send_imessage, - conversation_id=str(meta.get("conversation_id") or ""), - text=strip_markdown(content), + conversation_id=conversation_id, + text=text, ) else: # email + identity = await asyncio.to_thread(self._inkbox.get_identity, self.cfg.identity) subject = str(meta.get("subject") or "").strip() reply_subject = subject if subject.lower().startswith("re:") else f"Re: {subject}" if subject else "From your Codex agent" await asyncio.to_thread( diff --git a/inkbox_codex/mcp_stdio.py b/inkbox_codex/mcp_stdio.py index ae73fd8..4e838fd 100644 --- a/inkbox_codex/mcp_stdio.py +++ b/inkbox_codex/mcp_stdio.py @@ -14,8 +14,10 @@ Inkbox = None # type: ignore try: + from .config import inkbox_client_kwargs from .tools import call_inkbox_tool, mcp_tool_list except ImportError: # pragma: no cover - direct local import/test fallback + from config import inkbox_client_kwargs from tools import call_inkbox_tool, mcp_tool_list @@ -31,7 +33,7 @@ class InkboxMcpServer: def __init__(self) -> None: self.api_key = os.getenv("INKBOX_API_KEY", "") self.identity = os.getenv("INKBOX_IDENTITY", "") - self.base_url = os.getenv("INKBOX_BASE_URL") or "https://inkbox.ai" + self.base_url = os.getenv("INKBOX_BASE_URL", "").strip() self._client: Any = None def _inkbox(self) -> Any: @@ -40,7 +42,7 @@ def _inkbox(self) -> Any: if not self.api_key or not self.identity: raise RuntimeError("INKBOX_API_KEY and INKBOX_IDENTITY are required") if self._client is None: - self._client = Inkbox(api_key=self.api_key, base_url=self.base_url) + self._client = Inkbox(**inkbox_client_kwargs(self.api_key, self.base_url)) return self._client async def handle(self, message: Dict[str, Any]) -> Dict[str, Any] | None: diff --git a/inkbox_codex/prompts.py b/inkbox_codex/prompts.py index c751c48..6e804a6 100644 --- a/inkbox_codex/prompts.py +++ b/inkbox_codex/prompts.py @@ -3,7 +3,7 @@ from __future__ import annotations import re -from typing import Any, Dict +from typing import Any, Dict, Optional # Appended to the codex system prompt preset for every bridged # session. The agent is a full Codex instance with tool access — @@ -15,10 +15,10 @@ human is talking to you over {channels}. Your replies are delivered to their phone or inbox, so: -- Each incoming message starts with a small bracketed tag showing how it - reached you and from whom — e.g. [iMessage from +15551234567] or - [Spoken live on a phone call]. Read it to know which channel you're on - right now, but never repeat the tag back in your reply. +- Each incoming message starts with a small [inkbox:...] metadata tag showing + how it reached you, the remote phone/email, and any resolved Inkbox contact. + Read it to know who you are talking to and which channel you're on right now, + but never repeat the tag back in your reply. - Plain text only. No markdown — no **bold**, no backticks, no headers, no bullet lists, no code blocks unless they explicitly ask for code. - Keep it short and conversational. Think texts, not essays. Lead with @@ -54,6 +54,19 @@ proactively — e.g. "email me the full report" or a cron-style ping. Replies on the channel you were messaged on are sent automatically; only use these tools for a *different* channel or recipient. + +# Inkbox contacts + +Codex can read and write Inkbox contacts visible to this configured identity. + +- Use inkbox_list_contacts for name-based searches like "who is Alex?". +- Use inkbox_lookup_contact when you have an exact or partial email/phone filter. +- Use inkbox_get_contact to fetch a full contact by UUID after list/lookup returns one. +- Use inkbox_create_contact when the user asks you to save a new person or contact card. +- Use inkbox_update_contact when the user asks you to change an existing contact; look up the contact first if you do not already have its UUID. +- Use inkbox_delete_contact only after the target contact is explicit and confirmed. +- There is no vCard export/import, contact access, or contact rule tool in this harness. +- Contact tools operate only on contacts visible/writable to the configured identity. """.strip() @@ -85,6 +98,22 @@ def build_channel_prompt( ) +def contact_marker(details: Optional[Dict[str, Any]]) -> str: + """Render a one-line Inkbox contact summary for inbound turn tags.""" + if not details or not details.get("id"): + return "contact=unknown_in_inkbox" + parts = [f"contact_id={details['id']}"] + if details.get("name"): + parts.append(f"contact_name={details['name']!r}") + if details.get("company"): + parts.append(f"contact_company={details['company']!r}") + if details.get("emails"): + parts.append(f"contact_emails={details['emails']}") + if details.get("phones"): + parts.append(f"contact_phones={details['phones']}") + return " ".join(parts) + + def frame_inbound(mode: str, meta: Dict[str, Any], text: str) -> str: """Prefix an inbound message with a tag naming its channel and sender. @@ -100,23 +129,33 @@ def frame_inbound(mode: str, meta: Dict[str, Any], text: str) -> str: Returns: str: ``text`` prefixed with a one-line bracketed channel tag. """ + if text.lstrip().startswith("[inkbox:"): + return text + meta = meta or {} sender = str(meta.get("sender") or "").strip() - from_part = f" from {sender}" if sender else "" + from_part = f" from={sender}" if sender else "" + marker = contact_marker(meta.get("contact")) if mode == "email": - header = f"[Email{from_part}]" subject = str(meta.get("subject") or "").strip() - if subject: - header += f"\nSubject: {subject}" + subject_part = f" subject={subject!r}" if subject else "" + header = f"[inkbox:email{from_part}{subject_part} | {marker}]" elif mode == "sms": - header = f"[Text message (SMS){from_part}]" + conversation_id = str(meta.get("conversation_id") or "").strip() + conversation_part = f" conversation_id={conversation_id}" if conversation_id else "" + label = "group_sms" if meta.get("conversation_kind") == "group" else "sms" + header = f"[inkbox:{label}{from_part}{conversation_part} | {marker}]" elif mode == "imessage": - header = f"[iMessage{from_part}]" + conversation_id = str(meta.get("conversation_id") or "").strip() + conversation_part = f" conversation_id={conversation_id}" if conversation_id else "" + header = f"[inkbox:imessage{from_part}{conversation_part} | {marker}]" elif mode == "voice": - header = "[Spoken live on a phone call — keep the reply short and speech-friendly]" + call_id = str(meta.get("call_id") or "").strip() + call_part = f" call_id={call_id}" if call_id else "" + header = f"[inkbox:voice_call{call_part} | {marker}]" else: - header = f"[Message via {mode}{from_part}]" - return f"{header}\n\n{text}" + header = f"[inkbox:{mode}{from_part} | {marker}]" + return f"{header}\n{text}" _MD_PATTERNS = [ diff --git a/inkbox_codex/realtime.py b/inkbox_codex/realtime.py index fbd778d..ae8ad5a 100644 --- a/inkbox_codex/realtime.py +++ b/inkbox_codex/realtime.py @@ -1,6 +1,7 @@ """Inkbox ↔ OpenAI Realtime API voice bridge for live phone calls. -Ported from hermes-agent-plugin's ``realtime.py``, trimmed to one tool. +Ported from Hermes' Inkbox realtime bridge, with the coding-agent tool tier +kept intact. When Realtime is configured, the gateway pre-opens an OpenAI Realtime WebSocket *before* accepting the Inkbox call in raw-media mode, then runs @@ -12,8 +13,8 @@ model's own voice is what the caller hears. The Realtime model runs the spoken conversation itself. It only reaches -back to Codex through the single ``consult_agent`` tool — and -only when the caller asks for real work. The consult runs in the caller's +back to Codex through the ``consult_agent`` tool — and only when the caller +asks for real work or account/contact context. The consult runs in the caller's shared :class:`~inkbox_codex.sessions.ContactSession` and its text answer is handed back to the model, which speaks it. If OpenAI can't be reached the gateway falls back to Inkbox STT/TTS (see ``_handle_call_ws``). @@ -48,7 +49,7 @@ DEFAULT_VOICE = "cedar" # μ-law telephony audio, matching the codec Inkbox bridges from the carrier. AUDIO_FORMAT_TELEPHONY = {"type": "audio/pcmu"} -INPUT_TRANSCRIPTION_MODEL = "gpt-4o-mini-transcribe" +INPUT_TRANSCRIPTION_MODEL = "whisper-1" CONSULT_TOOL_NAME = "consult_agent" POST_CALL_ACTION_TOOL_NAME = "register_post_call_action" @@ -62,15 +63,30 @@ HANGUP_CONFIRM_WINDOW_S = 60.0 # Brief grace so the model's spoken goodbye reaches the caller before we drop. HANGUP_CLOSE_DELAY_S = 2.0 - - -# A consult takes (query, recent_transcript) and returns Codex's spoken- -# friendly answer. The gateway wires this to the caller's ContactSession. -AgentConsultCallback = Callable[[str, List[Tuple[str, str]]], Awaitable[str]] -# After the call ends with queued actions: (actions, transcript) → run them. -PostCallActionsCallback = Callable[[List[Dict[str, str]], List[Tuple[str, str]]], Awaitable[None]] -# After a call with no queued actions: (transcript) → reflect / follow up. -CallEndedCallback = Callable[[List[Tuple[str, str]]], Awaitable[None]] +# Never let a cancelled consult/task hold the call WebSocket cleanup forever. +TASK_CANCEL_TIMEOUT_S = 2.0 + + +# A consult takes live-call context plus the realtime model's request and +# returns Codex's spoken-friendly answer. The gateway wires this to the +# caller's ContactSession. +AgentConsultCallback = Callable[ + [ + "RealtimeCallMeta", + str, + List[Tuple[str, str]], + List[Dict[str, str]], + List["RealtimeConsultResult"], + ], + Awaitable[str], +] +# After the call ends with queued actions: (meta, actions, transcript, consults) → run them. +PostCallActionsCallback = Callable[ + ["RealtimeCallMeta", List[Dict[str, str]], List[Tuple[str, str]], List["RealtimeConsultResult"]], + Awaitable[None], +] +# After a call with no queued actions: (meta, transcript) → reflect / follow up. +CallEndedCallback = Callable[["RealtimeCallMeta", List[Tuple[str, str]]], Awaitable[None]] # ---------------------------------------------------------------------- @@ -103,13 +119,36 @@ class RealtimeCallMeta: call_id: str remote_phone_number: Optional[str] + direction: str = "inbound" + agent_identity_handle: Optional[str] = None + agent_identity_email: Optional[str] = None agent_identity_phone: Optional[str] = None project_dir: Optional[str] = None + contact_known: bool = False + contact_id: Optional[str] = None + contact_name: Optional[str] = None + contact_emails: List[str] = field(default_factory=list) + contact_phones: List[str] = field(default_factory=list) + contact_company: Optional[str] = None + contact_job_title: Optional[str] = None + contact_notes: Optional[str] = None # Outbound calls only: why this agent placed the call, threaded from # ``inkbox_place_call`` so the live session opens with context, not cold. outbound_purpose: Optional[str] = None outbound_opening: Optional[str] = None outbound_context: Optional[str] = None + outbound_reason: Optional[str] = None + outbound_scheduled_by: Optional[str] = None + outbound_conversation_summary: Optional[str] = None + + +@dataclass +class RealtimeConsultResult: + id: str + request: str + result: str + created_at: float + dedupe_key: Optional[str] = None @dataclass @@ -117,6 +156,7 @@ class _BridgeState: transcript: List[Tuple[str, str]] = field(default_factory=list) # Work the model asked to run after the call: [{"action", "details"}]. post_call_actions: List[Dict[str, str]] = field(default_factory=list) + consult_results: List[RealtimeConsultResult] = field(default_factory=list) closed: bool = False greeting_triggered: bool = False # Inkbox-assigned stream id from the `start` event; echoed on outbound @@ -148,15 +188,73 @@ def build_realtime_instructions(meta: RealtimeCallMeta, additional: str = "") -> str: The instruction string for the ``session.update``. """ lines = [ - "You are a Codex agent speaking with your operator on a live phone call.", - "Use natural, concise spoken replies — usually one or two short sentences.", + "You are the configured Codex Inkbox agent speaking on a live Inkbox phone call.", + "Use natural, concise spoken replies. Keep most answers to one or two short sentences.", "You are a voice; do not read out code, file paths, diffs, or logs verbatim.", - "", - f"To do real work NOW in the project ({meta.project_dir or 'the working directory'}) " - f"— read or edit files, run commands or tests, check git, search the codebase — " - f"call {CONSULT_TOOL_NAME} with a plain-English request. It runs the Codex " - "agent in the caller's ongoing conversation and returns a spoken-friendly answer; " - "read that answer back in your own voice.", + "Do not mention implementation details unless the caller asks.", + ] + if meta.agent_identity_handle: + lines.append(f"Your Inkbox identity handle: {meta.agent_identity_handle}.") + if meta.agent_identity_email: + lines.append(f"Your Inkbox agent email address: {meta.agent_identity_email}.") + if meta.agent_identity_phone: + lines.append(f"Your Inkbox agent phone number: {meta.agent_identity_phone}.") + if meta.remote_phone_number: + lines.append(f"Remote phone number: {meta.remote_phone_number}.") + if meta.contact_known: + lines.append( + "Known Inkbox contact info is already loaded; do not look them up or ask for details you already have.", + ) + if meta.contact_name: + lines.append(f"Contact name: {meta.contact_name}.") + if meta.contact_id: + lines.append(f"Inkbox contact id: {meta.contact_id}.") + if meta.contact_company: + lines.append(f"Contact company: {meta.contact_company}.") + if meta.contact_job_title: + lines.append(f"Contact title: {meta.contact_job_title}.") + if meta.contact_emails: + lines.append(f"Contact email(s): {', '.join(meta.contact_emails)}.") + if meta.contact_phones: + lines.append(f"Contact phone(s): {', '.join(meta.contact_phones)}.") + if meta.contact_notes: + lines.append(f"Contact notes: {meta.contact_notes}") + else: + lines.append( + "No matching Inkbox contact record is loaded; use the phone number or a neutral greeting.", + ) + if meta.direction == "outbound": + if meta.outbound_purpose: + lines.append(f"This is an outbound call you placed. Purpose: {meta.outbound_purpose}") + if meta.outbound_reason: + lines.append(f"Reason for the call: {meta.outbound_reason}") + if meta.outbound_scheduled_by: + lines.append(f"This call was scheduled by: {meta.outbound_scheduled_by}") + if meta.outbound_conversation_summary: + lines.append( + f"Summary of the prior conversation that led to this call:\n{meta.outbound_conversation_summary}", + ) + if meta.outbound_context: + lines.append(f"Relevant outbound-call context:\n{meta.outbound_context}") + if meta.outbound_opening: + lines.append( + f"Preferred opening message (say this naturally as your first turn): {meta.outbound_opening}", + ) + lines.append( + "For outbound calls, do not open with a generic offer to help. Start by explaining why you are calling, then ask the next specific question or give the requested update.", + ) + lines.append( + "If the caller asks why you called or whether you know why you are calling, answer from the loaded outbound purpose/context. Never say you only have contact or call info when outbound purpose/context is present.", + ) + lines.extend([ + "Do not perform a context lookup before greeting the caller. Do not say you are waiting on a lookup or checking context.", + "Stay anchored to this live call's loaded purpose and contact context. Do not switch to unrelated prior text-session work.", + f"Call {CONSULT_TOOL_NAME} only when the caller asks for work the voice model cannot do by itself: " + f"real project work in {meta.project_dir or 'the working directory'}, Inkbox account/tool lookups, " + "contact lookup or edits, text/call/email inspection, file edits, commands, tests, git, or code search.", + "Do not use consult_agent for ordinary conversation, shopping advice, brainstorming, greetings, hangups, or questions you can answer from the loaded call context.", + f"When you do call {CONSULT_TOOL_NAME}, use a plain-English request. It runs the Codex " + "agent in the caller's ongoing conversation and returns a spoken-friendly answer; read that answer back in your own voice.", f"If the caller wants work done AFTER the call (or accepts a deferral), call " f"{POST_CALL_ACTION_TOOL_NAME} to queue it. Tell them it's queued for after the " "call; do not claim it is already done.", @@ -168,21 +266,11 @@ def build_realtime_instructions(meta: RealtimeCallMeta, additional: str = "") -> f"{HANG_UP_CALL_TOOL_NAME}: the first call arms hangup and asks you to say a short " "goodbye; after the goodbye, call it once more to actually end the call.", f"Do NOT call {CONSULT_TOOL_NAME} for greetings, small talk, or questions you " - "can answer directly. Use it whenever the caller wants something done in the code.", + "can answer directly from the loaded call context. Use it whenever the caller wants " + "something done in code, asks for contact/account context you do not already have, " + "or needs an Inkbox tool lookup. Do not call it for ordinary advice or brainstorming.", "While a tool runs you may say a brief 'one moment' so the caller isn't left in silence.", - ] - if meta.outbound_purpose: - lines += [ - "", - "This is an OUTBOUND call you placed; the callee did not call you. " - f"You are calling because: {meta.outbound_purpose}", - ] - if meta.outbound_context: - lines.append(f"Background: {meta.outbound_context}") - lines.append( - "Open by greeting them, saying who you are, and stating why you're " - "calling in one short sentence, then let them respond." - ) + ]) if additional.strip(): lines += ["", additional.strip()] return "\n".join(lines) @@ -190,20 +278,21 @@ def build_realtime_instructions(meta: RealtimeCallMeta, additional: str = "") -> def build_realtime_greeting(meta: RealtimeCallMeta) -> str: """Instructions for the proactive opening line spoken at pickup.""" - if meta.outbound_opening: + first_name = meta.contact_name.split()[0] if meta.contact_known and meta.contact_name else "there" + if meta.direction == "outbound" and meta.outbound_opening: return ( - "Open the call by saying, naturally and in one short sentence: " - f"\"{meta.outbound_opening}\" Then stop and let them respond." + "Say exactly this as the very first thing, with no greeting before it and no extra words:\n" + f"{meta.outbound_opening}" ) - if meta.outbound_purpose: + if meta.direction == "outbound" and meta.outbound_purpose: return ( - "You placed this call. Open by greeting them, saying you're their " - f"Codex agent, and stating why you're calling: {meta.outbound_purpose}. " - "Keep it to one short sentence, then stop." + f"Greet {first_name} briefly, then immediately explain that you are calling because: " + f"{meta.outbound_purpose}. Do not ask a generic how-can-I-help question." ) return ( - "Greet the caller briefly and naturally, e.g. \"Hey, it's your Codex " - "agent — what do you need?\" Keep it to one short sentence and then stop." + f"Greet the caller now as the very first thing you say. Say something like " + f"'Hi {first_name}, this is your Codex Inkbox agent - how can I help?' " + f"Keep it to one short sentence and then wait for them to respond." ) @@ -218,10 +307,13 @@ def _consult_tool_schema() -> Dict[str, Any]: "name": CONSULT_TOOL_NAME, "description": ( "Hand a request to the Codex agent working in the project, when " - "the caller wants real work done — read/edit files, run commands or " - "tests, check git status, search the codebase, etc. The request runs " - "in the caller's ongoing conversation and you get back a spoken-friendly " - "answer to read aloud. Do NOT use this for greetings or small talk." + "the caller wants real work done that the voice model cannot do itself - " + "look up contacts, inspect Inkbox texts/calls/email, read/edit files, " + "run commands or tests, check git status, search the codebase, etc. " + "The request runs in the caller's ongoing conversation and you get " + "back a spoken-friendly answer to read aloud. Do NOT use this for " + "greetings, hangups, small talk, ordinary conversation, shopping " + "advice, or brainstorming." ), "parameters": { "type": "object", @@ -401,18 +493,34 @@ async def run( ), name=f"realtime-openai-pump-{self.meta.call_id}", ) - _, pending = await asyncio.wait( + done, _pending = await asyncio.wait( {inkbox_task, openai_task}, return_when=asyncio.FIRST_COMPLETED ) - for task in pending: - task.cancel() + for task in done: + if task.cancelled(): + continue + exc = task.exception() + if exc: + logger.warning("[realtime] pump %s raised: %s", task.get_name(), exc) finally: state.closed = True - await _cancel_consult_tasks(state) + tasks = [ + task for task in ( + locals().get("inkbox_task"), + locals().get("openai_task"), + ) + if task is not None + ] + for task in tasks: + if not task.done(): + task.cancel() + await _maybe_close_ws(inkbox_ws) await self.close() + await _settle_tasks(tasks, label="pump") + await _cancel_consult_tasks(state) # After teardown: run queued after-call work, or a follow-up reflection. - await _dispatch_post_call(state, on_post_call_actions, on_call_ended) + await _dispatch_post_call(state, self.meta, on_post_call_actions, on_call_ended) async def close(self) -> None: if self._closed: @@ -479,11 +587,25 @@ async def _cancel_consult_tasks(state: _BridgeState) -> None: """Cancel in-flight consult tasks and let them settle.""" tasks = list(state.consult_tasks) state.consult_tasks.clear() + if not tasks: + return for task in tasks: task.cancel() - for task in tasks: - with suppress(asyncio.CancelledError, Exception): - await task + await _settle_tasks(tasks, label="consult") + + +async def _settle_tasks(tasks: List["asyncio.Task[Any]"], *, label: str) -> None: + """Let cancelled background tasks drain, but never block call teardown.""" + if not tasks: + return + try: + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=TASK_CANCEL_TIMEOUT_S, + ) + except asyncio.TimeoutError: + names = ", ".join(task.get_name() for task in tasks) + logger.warning("[realtime] timed out waiting for %s task cancellation: %s", label, names) # ---------------------------------------------------------------------- @@ -549,6 +671,12 @@ async def _maybe_send_greeting( "type": "response.create", "response": {"instructions": build_realtime_greeting(meta)}, })) + logger.info( + "[realtime] greeting sent call_id=%s direction=%s outbound_context=%s", + meta.call_id, + meta.direction, + bool(meta.outbound_purpose or meta.outbound_opening or meta.outbound_context), + ) except Exception as exc: logger.debug("[realtime] greeting send failed: %s", exc) @@ -615,6 +743,11 @@ async def _finalize_fn_call(entry: Dict[str, str]) -> None: if not cid or cid in dispatched: return dispatched.add(cid) + logger.info( + "[realtime] dispatching tool call name=%s call_id=%s", + entry.get("name") or "", + cid, + ) coro = _dispatch_tool_call( openai_ws=openai_ws, inkbox_ws=inkbox_ws, @@ -623,6 +756,7 @@ async def _finalize_fn_call(entry: Dict[str, str]) -> None: arguments_json=entry.get("args") or "{}", state=state, config=config, + meta=meta, on_agent_consult=on_agent_consult, ) # The consult runs a full Codex turn (seconds). Awaiting it here @@ -631,7 +765,16 @@ async def _finalize_fn_call(entry: Dict[str, str]) -> None: # which is exactly the async-tool flow gpt-realtime expects. task = asyncio.create_task(coro, name=f"realtime-consult-{cid}") state.consult_tasks.add(task) - task.add_done_callback(state.consult_tasks.discard) + def _done(done_task: "asyncio.Task[None]") -> None: + state.consult_tasks.discard(done_task) + if done_task.cancelled(): + logger.info("[realtime] tool call cancelled call_id=%s", cid) + return + exc = done_task.exception() + if exc: + logger.warning("[realtime] tool call task failed call_id=%s: %s", cid, exc) + + task.add_done_callback(_done) async def _relay_transcript(party: str, text: str) -> None: # Realtime runs the WS in raw-media mode, so Inkbox does not create its @@ -711,23 +854,33 @@ async def _relay_transcript(party: str, text: str) -> None: elif ftype == "response.output_item.added": item = frame.get("item") or {} if item.get("type") == "function_call": - item_id = item.get("id") or "" - fn_calls[item_id] = { - "call_id": item.get("call_id") or "", - "name": item.get("name") or "", - "args": item.get("arguments") or "", - } + item_id = item.get("id") or frame.get("item_id") or "" + if item_id: + fn_calls[item_id] = { + "call_id": item.get("call_id") or "", + "name": item.get("name") or "", + "args": item.get("arguments") or "", + } elif ftype == "response.function_call_arguments.delta": - item_id = frame.get("item_id") or "" - if item_id in fn_calls: - fn_calls[item_id]["args"] += frame.get("delta") or "" + key = frame.get("item_id") or frame.get("call_id") or "" + if not key: + continue + entry = fn_calls.setdefault(key, {"call_id": "", "name": "", "args": ""}) + if not entry.get("call_id") and frame.get("call_id"): + entry["call_id"] = frame["call_id"] + if not entry.get("name") and frame.get("name"): + entry["name"] = frame["name"] + entry["args"] = (entry.get("args") or "") + (frame.get("delta") or "") elif ftype == "response.function_call_arguments.done": - item_id = frame.get("item_id") or "" - entry = fn_calls.get(item_id) - if entry is not None: - if frame.get("arguments"): - entry["args"] = frame["arguments"] - await _finalize_fn_call(entry) + key = frame.get("item_id") or frame.get("call_id") or "" + entry = fn_calls.get(key) or fn_calls.get(frame.get("call_id") or "") or {} + if frame.get("call_id"): + entry["call_id"] = frame["call_id"] + if frame.get("name"): + entry["name"] = frame["name"] + if frame.get("arguments"): + entry["args"] = frame["arguments"] + await _finalize_fn_call(entry) # Fallback: a completed function_call item. elif ftype in ("response.output_item.done", "conversation.item.done"): item = frame.get("item") or {} @@ -746,6 +899,16 @@ async def _relay_transcript(party: str, text: str) -> None: # ---------------------------------------------------------------------- +def _consult_result_text(output: Dict[str, Any]) -> str: + result = output.get("answer") or output.get("result") + if isinstance(result, str) and result.strip(): + return result.strip() + error = output.get("error") + if isinstance(error, str) and error.strip(): + return f"ERROR: {error.strip()}" + return json.dumps(output) + + async def _dispatch_tool_call( *, openai_ws: Any, @@ -755,6 +918,7 @@ async def _dispatch_tool_call( arguments_json: str, state: _BridgeState, config: RealtimeConfig, + meta: RealtimeCallMeta, on_agent_consult: AgentConsultCallback, ) -> None: """Handle a function call from the Realtime model. @@ -799,28 +963,60 @@ async def _dispatch_tool_call( try: answer = await asyncio.wait_for( - on_agent_consult(query, list(state.transcript)), + on_agent_consult( + meta, + query, + list(state.transcript), + list(state.post_call_actions), + list(state.consult_results), + ), timeout=config.consult_timeout_s, ) except asyncio.TimeoutError: - await _submit_tool_result(openai_ws, call_id, { + output = { "error": "consult timed out", "message": "Tell the caller you couldn't finish that right now; offer to follow up.", - }) + } + state.consult_results.append(RealtimeConsultResult( + id=call_id, + request=query, + result=_consult_result_text(output), + created_at=time.time(), + )) + await _submit_tool_result(openai_ws, call_id, output) return except Exception as exc: logger.warning("[realtime] consult failed: %s", exc) - await _submit_tool_result(openai_ws, call_id, { + output = { "error": f"consult error: {exc}", "message": "Apologize briefly and ask if you can help another way.", - }) + } + state.consult_results.append(RealtimeConsultResult( + id=call_id, + request=query, + result=_consult_result_text(output), + created_at=time.time(), + )) + await _submit_tool_result(openai_ws, call_id, output) return - await _submit_tool_result(openai_ws, call_id, { + output = { "status": "ok", "answer": answer, "instructions": "Read the answer back to the caller in your own voice. Keep it natural and concise.", - }) + } + if state.post_call_actions: + output["post_call_action_guidance"] = ( + "If this result completed, queued, canceled, or superseded a pending after-call action, " + "call delete_post_call_action for that action_index before the call ends." + ) + state.consult_results.append(RealtimeConsultResult( + id=call_id, + request=query, + result=_consult_result_text(output), + created_at=time.time(), + )) + await _submit_tool_result(openai_ws, call_id, output) async def _handle_register_action( @@ -943,18 +1139,24 @@ def _action_index(args: Dict[str, Any]) -> int: async def _dispatch_post_call( state: _BridgeState, + meta: RealtimeCallMeta, on_post_call_actions: PostCallActionsCallback, on_call_ended: CallEndedCallback, ) -> None: """Run exactly one follow-up after the call: queued actions, else a reflection.""" if state.post_call_actions: try: - await on_post_call_actions(list(state.post_call_actions), list(state.transcript)) + await on_post_call_actions( + meta, + list(state.post_call_actions), + list(state.transcript), + list(state.consult_results), + ) except Exception as exc: logger.warning("[realtime] post-call action dispatch failed: %s", exc) else: try: - await on_call_ended(list(state.transcript)) + await on_call_ended(meta, list(state.transcript)) except Exception as exc: logger.warning("[realtime] call-ended dispatch failed: %s", exc) diff --git a/inkbox_codex/sessions.py b/inkbox_codex/sessions.py index 67955ac..70a3733 100644 --- a/inkbox_codex/sessions.py +++ b/inkbox_codex/sessions.py @@ -53,6 +53,7 @@ HealthFn = Callable[[], Awaitable[str]] TYPING_REFRESH_SECONDS = 40.0 +TYPING_MAX_SECONDS = 600.0 @dataclass @@ -112,6 +113,22 @@ def _control_command(text: str) -> Optional[str]: return None +def _is_inkbox_mcp_tool_elicitation(params: Dict[str, Any]) -> bool: + """Return true for Codex MCP prompts asking to run Inkbox tools.""" + message = str(params.get("message") or params.get("prompt") or "").lower() + server = str( + params.get("serverName") + or params.get("server_name") + or params.get("mcpServerName") + or params.get("server") + or "" + ).lower() + tool = str(params.get("toolName") or params.get("tool_name") or params.get("tool") or "").lower() + if server == "inkbox" and tool.startswith("inkbox_"): + return True + return "run tool" in message and ("inkbox mcp server" in message or "inkbox_" in message) + + def _send_error_reason(exc: Exception) -> str: """Pull a human reason out of a send exception. @@ -648,12 +665,15 @@ async def _typing_loop(self) -> None: """Refresh the channel's typing indicator until the turn ends. Returns: - None: Runs until cancelled by :meth:`_run_turn`. + None: Runs until cancelled by :meth:`_run_turn` or the safety cap. """ if self.typing_fn is None: return + if self.reply_meta.get("typing") is False: + return + elapsed = 0.0 try: - while True: + while elapsed < TYPING_MAX_SECONDS: # Only iMessage has a typing bubble; stay quiet while an # escalation is parked waiting on the human to reply. if self.mode == "imessage" and self.pending is None: @@ -662,6 +682,7 @@ async def _typing_loop(self) -> None: except Exception: logger.debug("[session %s] typing ping failed", self.chat_id, exc_info=True) await asyncio.sleep(TYPING_REFRESH_SECONDS) + elapsed += TYPING_REFRESH_SECONDS except asyncio.CancelledError: return @@ -710,6 +731,9 @@ async def _handle_codex_request(self, method: str, params: Dict[str, Any]) -> Di if method == "mcpServer/elicitation/request": message = str(params.get("message") or params.get("prompt") or "Codex needs your input.") + if self.cfg.auto_approve_inkbox_tools and _is_inkbox_mcp_tool_elicitation(params): + logger.info("[session %s] Auto-approved Inkbox MCP tool elicitation: %s", self.chat_id, message) + return {"action": "accept", "content": {"text": "yes"}} reply = await self._escalate("poll", message) return {"action": "accept", "content": {"text": reply or ""}} diff --git a/inkbox_codex/setup_wizard.py b/inkbox_codex/setup_wizard.py index ea07298..6b62b85 100644 --- a/inkbox_codex/setup_wizard.py +++ b/inkbox_codex/setup_wizard.py @@ -27,10 +27,10 @@ from urllib.parse import urlencode try: - from .config import INKBOX_BASE_URL_DEFAULT + from .config import INKBOX_BASE_URL_DEFAULT, inkbox_base_url_kwargs, inkbox_client_kwargs from .realtime import DEFAULT_MODEL as REALTIME_MODEL, REALTIME_URL except ImportError: # pragma: no cover - direct local import/test fallback - from config import INKBOX_BASE_URL_DEFAULT + from config import INKBOX_BASE_URL_DEFAULT, inkbox_base_url_kwargs, inkbox_client_kwargs from realtime import DEFAULT_MODEL as REALTIME_MODEL, REALTIME_URL @@ -42,6 +42,7 @@ # Bundled avatar attached to the agent's Inkbox contact card during setup. _AVATAR_PATH = Path(__file__).resolve().parent / "assets" / "codex_avatar.png" +_RAW_AVATAR_BASE_URL_DEFAULT = "https://inkbox.ai" # ---------------------------------------------------------------------- @@ -497,7 +498,7 @@ def _setup_signing_key(api_key: str, base_url: str, Inkbox: Any) -> None: raise SystemExit(1) try: - new_key = Inkbox(api_key=api_key, base_url=base_url).create_signing_key() + new_key = Inkbox(**inkbox_client_kwargs(api_key, base_url)).create_signing_key() except Exception as exc: print_error(f" Failed to create signing key: {exc}") print_error(" A signing key is required; aborting setup. Retry, or paste an existing key.") @@ -550,7 +551,7 @@ def find_start(texts: Any) -> Any | None: return None try: - client = Inkbox(api_key=api_key, base_url=base_url) + client = Inkbox(**inkbox_client_kwargs(api_key, base_url)) except Exception: return @@ -597,6 +598,10 @@ def find_start(texts: Any) -> Any | None: # ---------------------------------------------------------------------- +def _avatar_base_url(base_url: str) -> str: + return (base_url or _RAW_AVATAR_BASE_URL_DEFAULT).rstrip("/") + + async def _identity_has_avatar_async(base_url: str, api_key: str, handle: str) -> bool | None: """Check whether an identity already has a contact-card avatar. @@ -610,7 +615,7 @@ async def _identity_has_avatar_async(base_url: str, api_key: str, handle: str) - """ import aiohttp - url = f"{base_url.rstrip('/')}/api/v1/identities/{handle}/avatar" + url = f"{_avatar_base_url(base_url)}/api/v1/identities/{handle}/avatar" timeout = aiohttp.ClientTimeout(total=10) try: async with aiohttp.ClientSession(timeout=timeout) as session: @@ -640,7 +645,7 @@ async def _upload_avatar_async( """ import aiohttp - url = f"{base_url.rstrip('/')}/api/v1/identities/{handle}/avatar" + url = f"{_avatar_base_url(base_url)}/api/v1/identities/{handle}/avatar" timeout = aiohttp.ClientTimeout(total=30) form = aiohttp.FormData() form.add_field("file", image, filename="codex_avatar.png", content_type="image/png") @@ -898,7 +903,7 @@ def _configure_imessage(api_key: str, base_url: str, handle: str, Inkbox: Any) - print_info(" No number to provision — you connect through the Inkbox iMessage router.") try: - client = Inkbox(api_key=api_key, base_url=base_url) + client = Inkbox(**inkbox_client_kwargs(api_key, base_url)) identity = client.get_identity(handle) except Exception as exc: print_warning(f" Could not load the identity for iMessage setup: {exc}") @@ -1137,8 +1142,8 @@ def _self_signup_flow(base_url: str, Inkbox: Any, InkboxAPIError: Any) -> tuple[ human_email=human_email, note_to_human=note, agent_handle=handle, - base_url=base_url, harness="codex", + **inkbox_base_url_kwargs(base_url), ) break except InkboxAPIError as exc: @@ -1213,7 +1218,11 @@ def _self_signup_flow(base_url: str, Inkbox: Any, InkboxAPIError: Any) -> tuple[ print_warning(" This code is dead. Type 'resend' before trying another code.") continue try: - verify = Inkbox.verify_signup(api_key=resp.api_key, verification_code=entry, base_url=base_url) + verify = Inkbox.verify_signup( + api_key=resp.api_key, + verification_code=entry, + **inkbox_base_url_kwargs(base_url), + ) print_success(f" Verified - claim status: {verify.claim_status}") verified = True break @@ -1235,7 +1244,7 @@ def _self_signup_flow(base_url: str, Inkbox: Any, InkboxAPIError: Any) -> tuple[ print_info(" We provision a local US number so SMS is supported.") if prompt_yes_no(" Provision a phone number for this agent?", True): try: - client = Inkbox(api_key=resp.api_key, base_url=base_url) + client = Inkbox(**inkbox_client_kwargs(resp.api_key, base_url)) provisioned_phone = client.phone_numbers.provision(agent_handle=resp.agent_handle, type="local") print_success(f" Provisioned: {provisioned_phone.number}") except InkboxAPIError as exc: @@ -1283,7 +1292,7 @@ def _retry_or_abort(retry_label: str, *, error_context: str = "") -> bool: def _try_resend(Inkbox: Any, InkboxAPIError: Any, api_key: str, base_url: str, human_email: str) -> bool: try: - Inkbox.resend_signup_verification(api_key=api_key, base_url=base_url) + Inkbox.resend_signup_verification(api_key=api_key, **inkbox_base_url_kwargs(base_url)) print_success(f" Resent. Check {human_email}.") return True except InkboxAPIError as exc: @@ -1334,7 +1343,7 @@ def _api_key_flow( return None, "", False try: - client = Inkbox(api_key=api_key, base_url=base_url) + client = Inkbox(**inkbox_client_kwargs(api_key, base_url)) info = client.whoami() except InkboxAPIError as exc: print_error(f" whoami failed: HTTP {_error_status(exc)} {_error_detail(exc)}") @@ -1359,9 +1368,9 @@ def _api_key_flow( if subtype == _enum_value(ADMIN_SCOPED): return _pick_admin_scoped(client, api_key, IdentityPhoneNumberCreateOptions, InkboxAPIError) - print_warning(f" Unrecognized API-key subtype: {subtype!r}.") - print_info(" Falling back to list_identities().") - return _pick_admin_scoped(client, api_key, IdentityPhoneNumberCreateOptions, InkboxAPIError) + print_error(f" Unsupported API-key subtype: {subtype!r}.") + print_info(" Use an admin-scoped or agent-scoped Inkbox API key.") + return None, "", False def _pick_agent_scoped(client: Any, api_key: str) -> tuple[Any | None, str, bool]: @@ -1598,6 +1607,25 @@ def _configure_project_dir() -> None: print_success(f" Codex will work in {chosen}") +def _configure_inkbox_tool_approvals() -> None: + """Ask whether Inkbox MCP tools should run without per-call prompts.""" + print() + print(color(" --- Inkbox tool approvals ---", Colors.CYAN)) + print_info(" Codex uses Inkbox tools to send email, SMS, and iMessage,") + print_info(" place calls, inspect call/text history, and manage contacts.") + print_info(" Trusting these tools skips repeated Inkbox allow prompts while") + print_info(" leaving normal Codex command and file approvals unchanged.") + + current = _env("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS").strip().lower() + default = current not in {"0", "false", "no", "off"} if current else True + allow = prompt_yes_no(" Allow this agent to run Inkbox tools without asking each time?", default) + _save("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS", "true" if allow else "false") + if allow: + print_success(" Inkbox tool prompts will be auto-approved.") + else: + print_info(" Codex will ask before each Inkbox tool call.") + + def _configure_autostart() -> None: """Offer to keep the gateway running — on boot, or just in the background. @@ -1719,6 +1747,7 @@ def interactive_setup() -> None: print() print_success(f"Inkbox is already configured for identity '{existing_identity}'.") if not prompt_yes_no(" Reconfigure Inkbox?", False): + _configure_inkbox_tool_approvals() return base_url = os.getenv("INKBOX_BASE_URL") or _env("INKBOX_BASE_URL") or INKBOX_BASE_URL_DEFAULT @@ -1777,6 +1806,8 @@ def interactive_setup() -> None: _configure_project_dir() + _configure_inkbox_tool_approvals() + _configure_autostart() print() diff --git a/inkbox_codex/tools.py b/inkbox_codex/tools.py index d66edbe..bc23349 100644 --- a/inkbox_codex/tools.py +++ b/inkbox_codex/tools.py @@ -33,6 +33,9 @@ JsonSchema = Dict[str, Any] +SMS_MAX_LENGTH = 1600 +IMESSAGE_MAX_LENGTH = 18995 + @dataclass(frozen=True) class ToolSpec: @@ -50,10 +53,12 @@ def _schema(properties: Dict[str, JsonSchema], required: List[str] | None = None } -def _str(desc: str = "") -> JsonSchema: +def _str(desc: str = "", *, max_length: int | None = None) -> JsonSchema: schema: JsonSchema = {"type": "string"} if desc: schema["description"] = desc + if max_length is not None: + schema["maxLength"] = max_length return schema @@ -96,7 +101,7 @@ def _str_list(desc: str = "") -> JsonSchema: _schema( { "to": _str("E.164 recipient number or an existing text conversation id."), - "text": _str("Message body."), + "text": _str("Message body, max 1600 chars.", max_length=SMS_MAX_LENGTH), "media_paths": _str_list("Local file paths to upload and attach."), "media_urls": _str_list("Already-hosted media URLs to attach."), }, @@ -109,7 +114,7 @@ def _str_list(desc: str = "") -> JsonSchema: _schema( { "conversation_id": _str("Existing iMessage conversation id."), - "text": _str("Message body."), + "text": _str("Message body, max 18995 chars.", max_length=IMESSAGE_MAX_LENGTH), "media_path": _str("Optional local file path to upload and attach."), }, ["conversation_id", "text"], @@ -232,8 +237,8 @@ def _str_list(desc: str = "") -> JsonSchema: }, ["contact_id"]), ), ToolSpec( - "inkbox_export_contact_vcard", - "Export one contact as a vCard 4.0 string by contact id.", + "inkbox_delete_contact", + "Remove a contact from the address book by contact id. Look it up first to confirm the target.", _schema({"contact_id": _str("Contact id.")}, ["contact_id"]), ), ] @@ -266,18 +271,27 @@ def _tool_result(data: Any) -> Dict[str, Any]: } -def _tool_error(message: str) -> Dict[str, Any]: +def _tool_error(message: str, **fields: Any) -> Dict[str, Any]: + payload = {"error": message, **fields} return { "content": [ { "type": "text", - "text": json.dumps({"error": message}, ensure_ascii=False), + "text": json.dumps(_json_safe(payload), ensure_ascii=False), } ], "isError": True, } +def _message_too_long_reason(channel: str, content: str, max_chars: int) -> str: + char_count = len(content or "") + return ( + f"{channel} text is {char_count} characters; maximum is {max_chars}. " + f"Shorten it or split it into smaller {channel} messages." + ) + + def _upload_media_url(identity: Any, path: str) -> str: resolved = Path(path).expanduser() upload = identity.upload_imessage_media( @@ -319,6 +333,26 @@ async def call_inkbox_tool(client: Any, identity_handle: str, name: str, args: D args = dict(args or {}) + if name == "inkbox_send_sms": + text = str(args.get("text") or "") + if len(text) > SMS_MAX_LENGTH: + return _tool_error( + _message_too_long_reason("SMS", text, SMS_MAX_LENGTH), + error_code="sms_too_long", + char_count=len(text), + max_chars=SMS_MAX_LENGTH, + ) + + if name == "inkbox_send_imessage": + text = str(args.get("text") or "") + if len(text) > IMESSAGE_MAX_LENGTH: + return _tool_error( + _message_too_long_reason("iMessage", text, IMESSAGE_MAX_LENGTH), + error_code="imessage_too_long", + char_count=len(text), + max_chars=IMESSAGE_MAX_LENGTH, + ) + def _identity(): return client.get_identity(identity_handle) @@ -362,10 +396,11 @@ def _run() -> Any: return {"sent": True, "id": str(getattr(msg, "id", "")), "media": len(urls)} if name == "inkbox_send_imessage": + text = str(args.get("text") or "") identity = _identity() kwargs: Dict[str, Any] = { "conversation_id": str(args["conversation_id"]), - "text": str(args.get("text") or ""), + "text": text, } media_path = str(args.get("media_path") or "").strip() if media_path: @@ -497,8 +532,9 @@ def _run() -> Any: ] return client.contacts.update(str(args["contact_id"]), **kwargs) - if name == "inkbox_export_contact_vcard": - return {"vcard": client.contacts.vcards.export_vcard(str(args["contact_id"]))} + if name == "inkbox_delete_contact": + client.contacts.delete(str(args["contact_id"])) + return {"deleted": str(args["contact_id"])} raise ValueError(f"unknown Inkbox tool: {name}") diff --git a/tests/test_config.py b/tests/test_config.py index 9033862..5b862d1 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -5,30 +5,36 @@ def test_read_config_defaults(monkeypatch): for var in ( "INKBOX_API_KEY", "INKBOX_IDENTITY", "INKBOX_ALLOW_ALL_USERS", "INKBOX_ALLOWED_USERS", "CODEX_BIN", "CODEX_SANDBOX", - "CODEX_APPROVAL_POLICY", + "CODEX_APPROVAL_POLICY", "INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS", + "INKBOX_BASE_URL", ): monkeypatch.delenv(var, raising=False) cfg = read_config() - assert cfg.base_url == "https://inkbox.ai" + assert cfg.base_url == "" assert cfg.require_signature is True assert cfg.codex_bin == "codex" assert cfg.codex_sandbox == "workspace-write" assert cfg.codex_approval_policy == "on-request" + assert cfg.auto_approve_inkbox_tools is False def test_read_config_env(monkeypatch): monkeypatch.setenv("INKBOX_API_KEY", "ApiKey_test") monkeypatch.setenv("INKBOX_IDENTITY", "code-agent") + monkeypatch.setenv("INKBOX_BASE_URL", "https://proxy.example") monkeypatch.setenv("INKBOX_ALLOWED_USERS", "+15551234567, me@example.com") monkeypatch.setenv("CODEX_BIN", "/opt/codex") monkeypatch.setenv("CODEX_SANDBOX", "read-only") monkeypatch.setenv("CODEX_APPROVAL_POLICY", "never") + monkeypatch.setenv("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS", "true") cfg = read_config() assert cfg.api_key == "ApiKey_test" + assert cfg.base_url == "https://proxy.example" assert cfg.allowed_users == ["+15551234567", "me@example.com"] assert cfg.codex_bin == "/opt/codex" assert cfg.codex_sandbox == "read-only" assert cfg.codex_approval_policy == "never" + assert cfg.auto_approve_inkbox_tools is True def _clear_realtime_env(monkeypatch): diff --git a/tests/test_gateway_call_ws.py b/tests/test_gateway_call_ws.py index 720a730..d484e87 100644 --- a/tests/test_gateway_call_ws.py +++ b/tests/test_gateway_call_ws.py @@ -3,6 +3,7 @@ from inkbox_codex import gateway from inkbox_codex.config import BridgeConfig +from inkbox_codex.gateway import _voice_consult_prompt class _FakeWS: @@ -27,12 +28,81 @@ async def __anext__(self): raise StopAsyncIteration +class _FakeTextMsg: + def __init__(self, data): + self.type = "text" + self.data = data + + +class _ScriptedWS(_FakeWS): + def __init__(self, messages): + super().__init__() + self._messages = list(messages) + self.sent = [] + + async def __anext__(self): + if not self._messages: + raise StopAsyncIteration + return self._messages.pop(0) + + async def send_str(self, data): + self.sent.append(data) + + class _FakeRequest: def __init__(self): self.headers = {} # no X-Call-Context; signature check is off self.query = {} # no context_token; inbound (no outbound place-call ctx) +class _NoDeliveryInkbox: + def get_identity(self, _identity): + raise AssertionError("send_to_contact must not reach Inkbox delivery") + + +class _FakeIdentity: + def __init__(self): + self.sent_texts = [] + self.sent_imessages = [] + + def send_text(self, **kwargs): + self.sent_texts.append(kwargs) + + def send_imessage(self, **kwargs): + self.sent_imessages.append(kwargs) + + +class _DeliveryInkbox: + def __init__(self, identity): + self.identity = identity + + def get_identity(self, _identity): + return self.identity + + +class _FakeContactSession: + def __init__(self): + self.inbound = [] + self.consults = [] + + async def handle_inbound(self, text, mode, meta): + self.inbound.append((text, mode, meta)) + + async def run_consult(self, prompt): + self.consults.append(prompt) + return "" + + +class _FakeSessions: + def __init__(self, session): + self.session = session + self.requested_ids = [] + + def get(self, chat_id): + self.requested_ids.append(chat_id) + return self.session + + def test_call_ws_declares_inkbox_stt_tts_headers(monkeypatch): """The WS upgrade must advertise platform-side STT/TTS so Inkbox sends us transcripts and speaks our text frames — without these it defaults to raw @@ -52,13 +122,164 @@ def test_call_ws_declares_inkbox_stt_tts_headers(monkeypatch): assert fake_ws.headers.get("x-use-inkbox-text-to-speech") == "true" +def test_send_to_contact_suppresses_exact_silent_reply(): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _NoDeliveryInkbox() + + asyncio.run(gw.send_to_contact("contact-1", "[SILENT]", "sms", {"to": "+15551234567"})) + + +def test_send_to_contact_drops_late_voice_reply_without_channel_fallback(): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _NoDeliveryInkbox() + + asyncio.run( + gw.send_to_contact( + "+15551234567", + "This answer finished after hangup.", + "voice", + {"call_id": "call-1", "to": "+15551234567"}, + ) + ) + + +def test_send_to_contact_rejects_over_limit_sms_without_delivery(): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _NoDeliveryInkbox() + + try: + asyncio.run( + gw.send_to_contact( + "+15551234567", + "x" * (gateway.SMS_MAX_LENGTH + 1), + "sms", + {"to": "+15551234567"}, + ) + ) + except ValueError as exc: + assert "SMS text is 1601 characters" in str(exc) + else: + raise AssertionError("expected over-limit SMS reply to be rejected") + + +def test_send_to_contact_rejects_over_limit_imessage_without_delivery(): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _NoDeliveryInkbox() + + try: + asyncio.run( + gw.send_to_contact( + "contact-1", + "x" * (gateway.IMESSAGE_MAX_LENGTH + 1), + "imessage", + {"conversation_id": "imconv-123"}, + ) + ) + except ValueError as exc: + assert "iMessage text is 18996 characters" in str(exc) + else: + raise AssertionError("expected over-limit iMessage reply to be rejected") + + +def test_send_to_contact_uses_prefixed_sms_conversation_chat_id(): + identity = _FakeIdentity() + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _DeliveryInkbox(identity) + + asyncio.run(gw.send_to_contact("sms:conv-123", "reply", "sms", {})) + + assert identity.sent_texts == [{"text": "reply", "conversation_id": "conv-123"}] + + +def test_send_to_contact_uses_prefixed_imessage_conversation_chat_id(): + identity = _FakeIdentity() + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, identity="codex")) + gw._inkbox = _DeliveryInkbox(identity) + + asyncio.run(gw.send_to_contact("imessage:imconv-123", "reply", "imessage", {})) + + assert identity.sent_imessages == [{"conversation_id": "imconv-123", "text": "reply"}] + + +def test_call_ws_stt_tts_runs_call_ended_reflection(monkeypatch): + fake_ws = _ScriptedWS([ + _FakeTextMsg('{"event":"start"}'), + _FakeTextMsg('{"event":"transcript","text":"Please send the summary after this.","is_final":true}'), + _FakeTextMsg('{"event":"stop"}'), + ]) + monkeypatch.setattr(gateway, "web", types.SimpleNamespace(WebSocketResponse=lambda: fake_ws)) + monkeypatch.setattr(gateway, "WSMsgType", types.SimpleNamespace(TEXT="text")) + + session = _FakeContactSession() + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False)) + gw.sessions = _FakeSessions(session) + + asyncio.run(gw._handle_call_ws(_FakeRequest())) + + assert session.inbound == [ + ( + "Please send the summary after this.", + "voice", + { + "call_id": "", + "sender": "", + "contact": None, + "direction": "inbound", + }, + ) + ] + assert len(session.consults) == 1 + assert "[voice call ended]" in session.consults[0] + assert "do not redo work that was already completed" in session.consults[0] + assert "Please send the summary after this." in session.consults[0] + + +def test_call_ws_uses_stored_call_contact_session_for_stt_tts(monkeypatch): + fake_ws = _ScriptedWS([ + _FakeTextMsg('{"event":"transcript","text":"Can you see my earlier texts?","is_final":true}'), + _FakeTextMsg('{"event":"stop"}'), + ]) + monkeypatch.setattr(gateway, "web", types.SimpleNamespace(WebSocketResponse=lambda: fake_ws)) + monkeypatch.setattr(gateway, "WSMsgType", types.SimpleNamespace(TEXT="text")) + + session = _FakeContactSession() + sessions = _FakeSessions(session) + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False)) + gw.sessions = sessions + gw._call_meta_by_id["call-1"] = { + "id": "call-1", + "direction": "inbound", + "remotePhoneNumber": "+15551234567", + "local_phone_number": "+15550001111", + "contacts": [{"bucket": "from", "contactId": "contact-1", "name": "Ada Lovelace"}], + } + request = _FakeRequest() + request.query = {"call_id": "call-1"} + + asyncio.run(gw._handle_call_ws(request)) + + assert sessions.requested_ids == ["contact-1", "contact-1"] + assert session.inbound[0][2]["sender"] == "+15551234567" + assert session.inbound[0][2]["contact"]["id"] == "contact-1" + assert session.inbound[0][2]["contact"]["name"] == "Ada Lovelace" + assert "call-1" not in gw._call_meta_by_id + + class _FakeBridge: def __init__(self): self.ran = False self.closed = False + self.consult_answer = None async def run(self, *, inkbox_ws, on_agent_consult, on_post_call_actions, on_call_ended): self.ran = True + self.consult_answer = await on_agent_consult( + types.SimpleNamespace(call_id="call-1"), + "help Dima choose a mountain bike", + [("assistant", "Hi Dima."), ("user", "I want to buy a mountain bike.")], + [], + [], + ) async def close(self): self.closed = True @@ -87,6 +308,138 @@ async def fake_open(*, config, meta): assert bridge.ran is True and bridge.closed is True +def test_call_ws_passes_outbound_context_to_realtime(monkeypatch, tmp_path): + fake_ws = _FakeWS() + monkeypatch.setattr(gateway, "web", types.SimpleNamespace(WebSocketResponse=lambda: fake_ws)) + monkeypatch.setenv("INKBOX_CODEX_HOME", str(tmp_path)) + bridge = _FakeBridge() + seen = {} + + context_dir = tmp_path / "call_contexts" + context_dir.mkdir() + (context_dir / "tok-123.json").write_text( + '{"purpose":"tell them the deploy is fixed","opening_message":"Hi there",' + '"context":"PR 12","to_number":"+15551234567"}' + ) + + async def fake_open(*, config, meta): + seen["meta"] = meta + return bridge + + monkeypatch.setattr(gateway, "open_inkbox_realtime_bridge", fake_open) + + from inkbox_codex.realtime import RealtimeConfig + + cfg = BridgeConfig(require_signature=False, realtime=RealtimeConfig(enabled=True, api_key="sk-x")) + gw = gateway.InkboxGateway(cfg) + request = _FakeRequest() + request.query = {"context_token": "tok-123"} + + asyncio.run(gw._handle_call_ws(request)) + + assert seen["meta"].direction == "outbound" + assert seen["meta"].remote_phone_number == "+15551234567" + assert seen["meta"].outbound_purpose == "tell them the deploy is fixed" + assert seen["meta"].outbound_opening == "Hi there" + assert seen["meta"].outbound_context == "PR 12" + + +def test_voice_consult_prompt_anchors_current_call(): + prompt = _voice_consult_prompt( + query="help Dima choose a mountain bike", + transcript=[("assistant", "Hi Dima."), ("user", "I want to buy a mountain bike.")], + outbound={ + "purpose": "Call specifically about figuring out how to buy a mountain bike.", + "context": "Discuss hardtail vs full suspension and budget.", + }, + contact={"name": "Dima"}, + direction="outbound", + ) + + assert "Do not continue unrelated prior text/session work" in prompt + assert "Do not run commands, run tests" in prompt + assert "Outbound call purpose: Call specifically about figuring out how to buy a mountain bike." in prompt + assert "user: I want to buy a mountain bike." in prompt + assert "Consult request: help Dima choose a mountain bike" in prompt + + +def test_realtime_consult_wraps_query_before_codex(monkeypatch, tmp_path): + fake_ws = _FakeWS() + monkeypatch.setattr(gateway, "web", types.SimpleNamespace(WebSocketResponse=lambda: fake_ws)) + monkeypatch.setenv("INKBOX_CODEX_HOME", str(tmp_path)) + bridge = _FakeBridge() + + context_dir = tmp_path / "call_contexts" + context_dir.mkdir() + (context_dir / "tok-bike.json").write_text( + '{"purpose":"Call about buying a mountain bike",' + '"context":"Budget and riding style","to_number":"+15551234567"}' + ) + + async def fake_open(*, config, meta): + return bridge + + monkeypatch.setattr(gateway, "open_inkbox_realtime_bridge", fake_open) + + from inkbox_codex.realtime import RealtimeConfig + + session = _FakeContactSession() + cfg = BridgeConfig(require_signature=False, realtime=RealtimeConfig(enabled=True, api_key="sk-x")) + gw = gateway.InkboxGateway(cfg) + gw.sessions = _FakeSessions(session) + request = _FakeRequest() + request.query = {"context_token": "tok-bike"} + + asyncio.run(gw._handle_call_ws(request)) + + assert bridge.consult_answer == "" + assert session.consults + prompt = session.consults[0] + assert "Voice call consult from the Inkbox Realtime agent." in prompt + assert "Outbound call purpose: Call about buying a mountain bike" in prompt + assert "Consult request: help Dima choose a mountain bike" in prompt + assert "Do not run commands, run tests" in prompt + + +def test_call_ws_passes_contact_and_identity_context_to_realtime(monkeypatch): + fake_ws = _FakeWS() + monkeypatch.setattr(gateway, "web", types.SimpleNamespace(WebSocketResponse=lambda: fake_ws)) + bridge = _FakeBridge() + seen = {} + + async def fake_open(*, config, meta): + seen["meta"] = meta + return bridge + + monkeypatch.setattr(gateway, "open_inkbox_realtime_bridge", fake_open) + + from inkbox_codex.realtime import RealtimeConfig + + cfg = BridgeConfig(require_signature=False, realtime=RealtimeConfig(enabled=True, api_key="sk-x")) + gw = gateway.InkboxGateway(cfg) + gw._identity = types.SimpleNamespace( + agent_handle="codex", + mailbox=types.SimpleNamespace(email_address="codex@example.com"), + phone_number=types.SimpleNamespace(number="+15550001111"), + ) + request = _FakeRequest() + request.headers = { + "X-Call-Context": ( + '{"id":"call-1","remote_phone_number":"+15551234567",' + '"contacts":[{"id":"contact-1","name":"Ada Lovelace"}]}' + ) + } + + asyncio.run(gw._handle_call_ws(request)) + + assert seen["meta"].agent_identity_handle == "codex" + assert seen["meta"].agent_identity_email == "codex@example.com" + assert seen["meta"].agent_identity_phone == "+15550001111" + assert seen["meta"].contact_known is True + assert seen["meta"].contact_id == "contact-1" + assert seen["meta"].contact_name == "Ada Lovelace" + + def test_call_ws_realtime_falls_back_to_stt_tts_on_connect_failure(monkeypatch): """If OpenAI can't be reached and fallback is allowed, accept the call on the Inkbox STT/TTS path (headers back to true) instead of dropping it.""" diff --git a/tests/test_gateway_dedup.py b/tests/test_gateway_dedup.py new file mode 100644 index 0000000..b80313c --- /dev/null +++ b/tests/test_gateway_dedup.py @@ -0,0 +1,65 @@ +import asyncio +import json +import types + +import pytest + +from inkbox_codex import gateway +from inkbox_codex.config import BridgeConfig + + +class _FakeResponse: + def __init__(self, *, status=200, text=""): + self.status = status + self.text = text + + +class _FakeRequest: + def __init__(self, body, *, request_id="req-1"): + self._body = body + self.headers = {"X-Inkbox-Request-Id": request_id} + + async def read(self): + return self._body + + +@pytest.fixture(autouse=True) +def fake_web(monkeypatch): + def json_response(payload): + return _FakeResponse(status=200, text=json.dumps(payload)) + + monkeypatch.setattr( + gateway, + "web", + types.SimpleNamespace(Response=_FakeResponse, json_response=json_response), + ) + + +def test_request_id_commits_after_success(): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, allow_all_users=True)) + body = json.dumps({"event_type": "unknown.event"}).encode() + + first = asyncio.run(gw._handle_webhook(_FakeRequest(body))) + second = asyncio.run(gw._handle_webhook(_FakeRequest(body))) + + assert json.loads(first.text)["ignored"] == "unknown.event" + assert json.loads(second.text)["deduped"] is True + + +def test_request_id_rolls_back_after_dispatch_failure(monkeypatch): + gw = gateway.InkboxGateway(BridgeConfig(require_signature=False, allow_all_users=True)) + calls = {"count": 0} + + async def fail_once(_envelope): + calls["count"] += 1 + raise RuntimeError("boom") + + monkeypatch.setattr(gw, "_on_text_received", fail_once) + body = json.dumps({"event_type": "text.received", "data": {"text_message": {"id": "t1"}}}).encode() + + with pytest.raises(RuntimeError): + asyncio.run(gw._handle_webhook(_FakeRequest(body))) + with pytest.raises(RuntimeError): + asyncio.run(gw._handle_webhook(_FakeRequest(body))) + + assert calls["count"] == 2 diff --git a/tests/test_gateway_inbound_media.py b/tests/test_gateway_inbound_media.py index 2dbd5cd..f04a9e3 100644 --- a/tests/test_gateway_inbound_media.py +++ b/tests/test_gateway_inbound_media.py @@ -31,6 +31,32 @@ def get(self, chat_id): return self.by_id.setdefault(chat_id, _FakeSession()) +class _FakeContacts: + def lookup(self, **kwargs): + if kwargs in ( + {"phone": "+15167251294"}, + {"email": "dima@inkbox.ai"}, + ): + return [ + types.SimpleNamespace( + id="contact-dima", + preferred_name="Dima", + given_name="Dima", + family_name="", + company_name="Inkbox", + job_title="Cofounder", + notes="private note", + emails=[ + types.SimpleNamespace(value="dima@inkbox.ai", is_primary=True), + ], + phones=[ + types.SimpleNamespace(value="+15167251294", is_primary=True), + ], + ) + ] + return [] + + def _gw(monkeypatch, saved): async def fake_download(items, *, prefix): # Pretend each item downloaded; echo count so the prefix/threading works. @@ -41,6 +67,10 @@ async def fake_download(items, *, prefix): return gw +def _attach_fake_contacts(gw): + gw._inkbox = types.SimpleNamespace(contacts=_FakeContacts()) + + def test_inbound_mms_media_only_wakes_agent_with_note(monkeypatch): gw = _gw(monkeypatch, [{"path": "/m/sms-0.jpg", "content_type": "image/jpeg"}]) envelope = {"data": {"text_message": { @@ -57,6 +87,23 @@ def test_inbound_mms_media_only_wakes_agent_with_note(monkeypatch): assert "Read tool" in body +def test_duplicate_inbound_sms_event_id_does_not_double_enqueue(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"text_message": { + "id": "t1", + "direction": "inbound", + "remote_phone_number": "+15551234567", + "text": "hello", + }}} + + first = asyncio.run(gw._on_text_received(envelope)) + second = asyncio.run(gw._on_text_received(envelope)) + + assert json.loads(first.text)["ok"] is True + assert json.loads(second.text)["deduped"] is True + assert len(gw.sessions.by_id["+15551234567"].inbound) == 1 + + def test_inbound_imessage_with_text_and_media_appends_note(monkeypatch): gw = _gw(monkeypatch, [{"path": "/m/imsg-0.png", "content_type": "image/png"}]) envelope = {"data": {"message": { @@ -71,6 +118,142 @@ def test_inbound_imessage_with_text_and_media_appends_note(monkeypatch): assert "/m/imsg-0.png (image/png)" in body +def test_duplicate_inbound_imessage_event_id_does_not_double_enqueue(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"message": { + "id": "i1", + "direction": "inbound", + "remote_number": "+15551112222", + "content": "hello", + }}} + + first = asyncio.run(gw._on_imessage_received(envelope)) + second = asyncio.run(gw._on_imessage_received(envelope)) + + assert json.loads(first.text)["ok"] is True + assert json.loads(second.text)["deduped"] is True + assert len(gw.sessions.by_id["+15551112222"].inbound) == 1 + + +def test_unknown_inbound_email_uses_thread_session_key(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"message": { + "id": "m1", + "from_address": "person@example.com", + "thread_id": "thread-123", + "subject": "Project", + "snippet": "Can you check this?", + }}} + + asyncio.run(gw._on_mail_received(envelope)) + + body, mode, meta = gw.sessions.by_id["email:thread-123"].inbound[0] + assert body == "Can you check this?" + assert mode == "email" + assert meta["to"] == "person@example.com" + assert meta["thread_id"] == "thread-123" + + +def test_inbound_email_lookup_injects_contact_without_webhook_contact(monkeypatch): + gw = _gw(monkeypatch, []) + _attach_fake_contacts(gw) + envelope = {"data": {"message": { + "id": "m-dima", + "from_address": "dima@inkbox.ai", + "thread_id": "thread-dima", + "subject": "Yo", + "snippet": "Who am I?", + }}} + + asyncio.run(gw._on_mail_received(envelope)) + + body, mode, meta = gw.sessions.by_id["contact-dima"].inbound[0] + assert body == "Who am I?" + assert mode == "email" + assert meta["contact"]["id"] == "contact-dima" + assert meta["contact"]["name"] == "Dima" + assert meta["contact"]["emails"] == ["dima@inkbox.ai"] + + +def test_unknown_direct_sms_uses_conversation_session_key(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"text_message": { + "id": "t-direct", + "direction": "inbound", + "remote_phone_number": "+15550000000", + "conversation_id": "conv-direct", + "text": "direct text", + }}} + + asyncio.run(gw._on_text_received(envelope)) + + body, mode, meta = gw.sessions.by_id["sms:conv-direct"].inbound[0] + assert body == "direct text" + assert mode == "sms" + assert meta["conversation_id"] == "conv-direct" + assert meta["conversation_kind"] == "direct" + + +def test_inbound_sms_lookup_injects_contact_without_webhook_contact(monkeypatch): + gw = _gw(monkeypatch, []) + _attach_fake_contacts(gw) + envelope = {"data": {"text_message": { + "id": "t-dima", + "direction": "inbound", + "remote_phone_number": "+15167251294", + "conversation_id": "conv-dima", + "text": "who am I?", + }}} + + asyncio.run(gw._on_text_received(envelope)) + + body, mode, meta = gw.sessions.by_id["contact-dima"].inbound[0] + assert body == "who am I?" + assert mode == "sms" + assert meta["contact"]["id"] == "contact-dima" + assert meta["contact"]["name"] == "Dima" + assert meta["contact"]["phones"] == ["+15167251294"] + + +def test_unknown_inbound_imessage_uses_conversation_session_key(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"message": { + "id": "i2", + "direction": "inbound", + "remote_number": "+15551112222", + "conversation_id": "imconv-123", + "content": "hello", + }}} + + asyncio.run(gw._on_imessage_received(envelope)) + + body, mode, meta = gw.sessions.by_id["imessage:imconv-123"].inbound[0] + assert body == "hello" + assert mode == "imessage" + assert meta["conversation_id"] == "imconv-123" + + +def test_inbound_imessage_lookup_injects_contact_without_webhook_contact(monkeypatch): + gw = _gw(monkeypatch, []) + _attach_fake_contacts(gw) + envelope = {"data": {"message": { + "id": "i-dima", + "direction": "inbound", + "remote_number": "+15167251294", + "conversation_id": "imconv-dima", + "content": "who am I?", + }}} + + asyncio.run(gw._on_imessage_received(envelope)) + + body, mode, meta = gw.sessions.by_id["contact-dima"].inbound[0] + assert body == "who am I?" + assert mode == "imessage" + assert meta["contact"]["id"] == "contact-dima" + assert meta["contact"]["name"] == "Dima" + assert meta["contact"]["phones"] == ["+15167251294"] + + def test_inbound_text_without_media_is_unchanged(monkeypatch): gw = _gw(monkeypatch, []) envelope = {"data": {"text_message": { @@ -82,6 +265,101 @@ def test_inbound_text_without_media_is_unchanged(monkeypatch): assert body == "just text" +def test_group_sms_injects_silent_policy(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": { + "text_message": { + "id": "t-group", + "direction": "inbound", + "remote_phone_number": "+15550000000", + "local_phone_number": "+15550000001", + "conversation_id": "conv-123", + "participants": ["+15550000000", "+15550000002"], + "text": "Dinner moved to 7.", + }, + }} + + asyncio.run(gw._on_text_received(envelope)) + + session = gw.sessions.by_id["sms:conv-123"] + body, mode, meta = session.inbound[0] + assert mode == "sms" + assert body.startswith("[inkbox:group_sms conversation_id=conv-123") + assert "participants=+15550000000,+15550000002" in body + assert "Group SMS response policy" in body + assert "return exactly [SILENT]" in body + assert meta["conversation_id"] == "conv-123" + assert meta["conversation_kind"] == "group" + + +def test_imessage_reaction_injects_silent_policy(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": { + "reaction": { + "id": "react-1", + "direction": "inbound", + "remote_number": "+15551112222", + "conversation_id": "imconv-123", + "target_message_id": "im-target-9", + "reaction": "question", + }, + "contacts": [{"id": "contact-9"}], + }} + + asyncio.run(gw._on_imessage_reaction_received(envelope)) + + session = gw.sessions.by_id["imessage:imconv-123"] + body, mode, meta = session.inbound[0] + assert mode == "imessage" + assert body.startswith("[inkbox:imessage_reaction from=+15551112222 reaction=question") + assert "conversation_id=imconv-123" in body + assert "target_message_id=im-target-9" in body + assert "contact=unknown_in_inkbox" in body + assert "return exactly [SILENT]" in body + assert meta["conversation_id"] == "imconv-123" + assert meta["typing"] is True + + +def test_imessage_reaction_without_contact_uses_conversation_session_key(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": { + "reaction": { + "id": "react-2", + "direction": "inbound", + "remote_number": "+15551112222", + "conversation_id": "imconv-456", + "target_message_id": "im-target-10", + "reaction": "like", + }, + }} + + asyncio.run(gw._on_imessage_reaction_received(envelope)) + + body, mode, meta = gw.sessions.by_id["imessage:imconv-456"].inbound[0] + assert mode == "imessage" + assert "reaction=like" in body + assert meta["conversation_id"] == "imconv-456" + + +def test_outbound_imessage_reaction_echo_is_ignored(monkeypatch): + gw = _gw(monkeypatch, []) + envelope = {"data": {"reaction": { + "id": "react-out", + "direction": "outbound", + "remote_number": "+15551112222", + "reaction": "like", + }}} + + resp = asyncio.run(gw._on_imessage_reaction_received(envelope)) + + assert json.loads(resp.text)["ignored"] == "outbound-reaction" + assert gw.sessions.by_id == {} + + +def test_imessage_reaction_subscribed(): + assert "imessage.reaction_received" in gateway.IMESSAGE_EVENTS + + def test_empty_message_no_text_no_media_is_ignored(monkeypatch): gw = _gw(monkeypatch, []) envelope = {"data": {"text_message": { diff --git a/tests/test_prompts.py b/tests/test_prompts.py index 844aace..72b1e87 100644 --- a/tests/test_prompts.py +++ b/tests/test_prompts.py @@ -3,21 +3,48 @@ def test_frame_inbound_tags_channel_and_sender(): assert frame_inbound("imessage", {"sender": "+15551234567"}, "hi").startswith( - "[iMessage from +15551234567]" + "[inkbox:imessage from=+15551234567 | contact=unknown_in_inkbox]" ) assert frame_inbound("sms", {"sender": "+15551234567"}, "yo").startswith( - "[Text message (SMS) from +15551234567]" + "[inkbox:sms from=+15551234567 | contact=unknown_in_inkbox]" ) # Email carries its subject into the tag. framed = frame_inbound("email", {"sender": "a@b.com", "subject": "Deploy?"}, "body") - assert framed.startswith("[Email from a@b.com]") - assert "Subject: Deploy?" in framed + assert framed.startswith("[inkbox:email from=a@b.com subject='Deploy?'") # Voice has no sender tag but flags speech. - assert frame_inbound("voice", {}, "what's up").startswith("[Spoken live on a phone call") + assert frame_inbound("voice", {}, "what's up").startswith("[inkbox:voice_call") # The body always survives intact. assert frame_inbound("imessage", {"sender": "x"}, "the message").endswith("the message") +def test_frame_inbound_includes_contact_marker(): + framed = frame_inbound( + "imessage", + { + "sender": "+15167251294", + "conversation_id": "imconv-1", + "contact": { + "id": "contact-dima", + "name": "Dima", + "company": "Inkbox", + "emails": ["dima@inkbox.ai"], + "phones": ["+15167251294"], + "job_title": "ignored", + "notes": "ignored", + }, + }, + "hi", + ) + assert framed.startswith( + "[inkbox:imessage from=+15167251294 conversation_id=imconv-1 | " + "contact_id=contact-dima contact_name='Dima' contact_company='Inkbox'" + ) + assert "contact_emails=['dima@inkbox.ai']" in framed + assert "contact_phones=['+15167251294']" in framed + assert "job_title" not in framed + assert "notes" not in framed + + def test_channel_prompt_mentions_identity_and_dir(): text = build_channel_prompt( project_dir="/srv/app", @@ -29,6 +56,11 @@ def test_channel_prompt_mentions_identity_and_dir(): assert "dev-agent@inkbox.ai" in text assert "jargon" in text.lower() assert "AskUserQuestion" in text + assert "Codex can read and write Inkbox contacts" in text + assert "inkbox_create_contact" in text + assert "inkbox_update_contact" in text + assert "inkbox_delete_contact" in text + assert "vCard export/import" in text def test_strip_markdown(): diff --git a/tests/test_realtime.py b/tests/test_realtime.py index ebdb0b9..bacde4c 100644 --- a/tests/test_realtime.py +++ b/tests/test_realtime.py @@ -36,7 +36,11 @@ def types(self): def _meta(): - return RealtimeCallMeta(call_id="c1", remote_phone_number="+15551234567", project_dir="/tmp/proj") + return RealtimeCallMeta( + call_id="c1", + remote_phone_number="+15551234567", + project_dir="/tmp/proj", + ) def test_session_update_configures_telephony_audio_vad_and_all_tools(): @@ -63,16 +67,43 @@ def test_session_update_configures_telephony_audio_vad_and_all_tools(): def test_instructions_name_the_consult_tool_and_project(): - text = build_realtime_instructions(_meta()) + meta = RealtimeCallMeta( + call_id="c1", + remote_phone_number="+15551234567", + project_dir="/tmp/proj", + agent_identity_handle="codex", + agent_identity_email="codex@example.com", + agent_identity_phone="+15550001111", + contact_known=True, + contact_id="contact-1", + contact_name="Ada Lovelace", + contact_emails=["ada@example.com"], + contact_phones=["+15551234567"], + contact_company="Inkbox", + contact_job_title="Engineer", + contact_notes="Prefers calls in the morning.", + ) + text = build_realtime_instructions(meta) assert CONSULT_TOOL_NAME in text assert "/tmp/proj" in text + assert "Your Inkbox identity handle: codex." in text + assert "codex@example.com" in text + assert "Ada Lovelace" in text + assert "ada@example.com" in text + assert "Do not perform a context lookup before greeting" in text + assert "contact lookup" in text + assert "Do not use consult_agent for ordinary conversation, shopping advice" in text + assert "Never say you only have contact or call info" not in text def test_outbound_call_context_shapes_realtime_prompt_and_greeting(): meta = RealtimeCallMeta( call_id="c1", remote_phone_number="+15551234567", + direction="outbound", project_dir="/tmp/proj", + contact_known=True, + contact_name="Ada Lovelace", outbound_purpose="tell them the deployment is fixed", outbound_opening="Hi, this is Codex calling with the deployment update.", outbound_context="Deployment failed twice before the final fix.", @@ -80,9 +111,10 @@ def test_outbound_call_context_shapes_realtime_prompt_and_greeting(): text = build_realtime_instructions(meta) - assert "OUTBOUND call" in text + assert "outbound call" in text assert "tell them the deployment is fixed" in text assert "Deployment failed twice before the final fix." in text + assert "Never say you only have contact or call info" in text assert "Hi, this is Codex calling with the deployment update." in build_realtime_greeting(meta) @@ -90,8 +122,11 @@ def test_dispatch_consult_runs_agent_and_speaks_answer(): ws = _FakeWS() state = _BridgeState() - async def fake_consult(query, transcript): + async def fake_consult(_meta, query, transcript, post_call_actions, consult_results): assert query == "run the tests" + assert transcript == [] + assert post_call_actions == [] + assert consult_results == [] return "tests pass, 42 green" asyncio.run(_dispatch_tool_call( @@ -102,6 +137,7 @@ async def fake_consult(query, transcript): arguments_json=json.dumps({"query": "run the tests"}), state=state, config=RealtimeConfig(api_key="sk-x"), + meta=_meta(), on_agent_consult=fake_consult, )) @@ -114,13 +150,15 @@ async def fake_consult(query, transcript): output = json.loads(item["item"]["output"]) assert output["status"] == "ok" assert output["answer"] == "tests pass, 42 green" + assert state.consult_results[0].request == "run the tests" + assert state.consult_results[0].result == "tests pass, 42 green" assert ws.types().count("response.create") >= 1 def test_dispatch_missing_query_returns_error(): ws = _FakeWS() - async def fake_consult(query, transcript): # pragma: no cover - must not run + async def fake_consult(*_args): # pragma: no cover - must not run raise AssertionError("consult should not be called without a query") asyncio.run(_dispatch_tool_call( @@ -131,6 +169,7 @@ async def fake_consult(query, transcript): # pragma: no cover - must not run arguments_json="{}", state=_BridgeState(), config=RealtimeConfig(api_key="sk-x"), + meta=_meta(), on_agent_consult=fake_consult, )) item = next(f for f in ws.sent if f.get("type") == "conversation.item.create") @@ -140,7 +179,7 @@ async def fake_consult(query, transcript): # pragma: no cover - must not run def test_dispatch_unknown_tool_refuses(): ws = _FakeWS() - async def fake_consult(query, transcript): # pragma: no cover + async def fake_consult(*_args): # pragma: no cover raise AssertionError("not the consult tool") asyncio.run(_dispatch_tool_call( @@ -151,6 +190,7 @@ async def fake_consult(query, transcript): # pragma: no cover arguments_json="{}", state=_BridgeState(), config=RealtimeConfig(api_key="sk-x"), + meta=_meta(), on_agent_consult=fake_consult, )) item = next(f for f in ws.sent if f.get("type") == "conversation.item.create") @@ -160,7 +200,7 @@ async def fake_consult(query, transcript): # pragma: no cover def test_consult_timeout_reports_error_not_crash(): ws = _FakeWS() - async def slow_consult(query, transcript): + async def slow_consult(*_args): await asyncio.sleep(1) return "too late" @@ -173,6 +213,7 @@ async def slow_consult(query, transcript): arguments_json=json.dumps({"query": "x"}), state=_BridgeState(), config=cfg, + meta=_meta(), on_agent_consult=slow_consult, )) item = next(f for f in ws.sent if f.get("type") == "conversation.item.create") @@ -193,7 +234,8 @@ def _dispatch(ws, name, args, state, inkbox_ws=None): arguments_json=json.dumps(args), state=state, config=RealtimeConfig(api_key="sk-x"), - on_agent_consult=lambda q, t: (_ for _ in ()).throw(AssertionError("no consult")), + meta=_meta(), + on_agent_consult=lambda *_args: (_ for _ in ()).throw(AssertionError("no consult")), )) @@ -264,15 +306,19 @@ def test_post_call_dispatch_runs_actions_when_queued(): state.transcript = [("caller", "open a pr please")] seen = {} - async def on_actions(actions, transcript): + async def on_actions(meta, actions, transcript, consult_results): + seen["meta"] = meta seen["actions"] = actions seen["transcript"] = transcript + seen["consult_results"] = consult_results - async def on_ended(transcript): # pragma: no cover - must not run + async def on_ended(*_args): # pragma: no cover - must not run raise AssertionError("should not reflect when actions are queued") - asyncio.run(_dispatch_post_call(state, on_actions, on_ended)) + asyncio.run(_dispatch_post_call(state, _meta(), on_actions, on_ended)) + assert seen["meta"].call_id == "c1" assert seen["actions"] == [{"action": "open a PR", "details": ""}] + assert seen["consult_results"] == [] def test_post_call_dispatch_reflects_when_no_actions(): @@ -280,13 +326,15 @@ def test_post_call_dispatch_reflects_when_no_actions(): state.transcript = [("agent", "bye")] seen = {} - async def on_actions(actions, transcript): # pragma: no cover - must not run + async def on_actions(*_args): # pragma: no cover - must not run raise AssertionError("no actions to run") - async def on_ended(transcript): + async def on_ended(meta, transcript): + seen["meta"] = meta seen["transcript"] = transcript - asyncio.run(_dispatch_post_call(state, on_actions, on_ended)) + asyncio.run(_dispatch_post_call(state, _meta(), on_actions, on_ended)) + assert seen["meta"].call_id == "c1" assert seen["transcript"] == [("agent", "bye")] @@ -298,6 +346,10 @@ def __init__(self, frames): type("Msg", (), {"type": "TEXT", "data": json.dumps(f)})() for f in frames ] + self.sent = [] + + async def send_str(self, data): + self.sent.append(json.loads(data)) def __aiter__(self): async def gen(): @@ -339,7 +391,7 @@ def test_realtime_transcripts_are_mirrored_into_inkbox(monkeypatch): state=state, config=RealtimeConfig(api_key="sk-x"), meta=_meta(), - on_agent_consult=lambda _q, _t: (_ for _ in ()).throw(AssertionError("no consult")), + on_agent_consult=lambda *_args: (_ for _ in ()).throw(AssertionError("no consult")), )) transcripts = [frame for frame in ink.sent if frame.get("event") == "transcript"] @@ -361,3 +413,132 @@ def test_realtime_transcripts_are_mirrored_into_inkbox(monkeypatch): ("caller", "hey can you check the build"), ("agent", "sure, the build is green"), ] + + +def test_openai_pump_dispatches_call_id_keyed_consult_events(monkeypatch): + """Match Hermes: GA Realtime may key argument events by call_id.""" + monkeypatch.setattr( + realtime, + "aiohttp", + types.SimpleNamespace( + WSMsgType=types.SimpleNamespace( + TEXT="TEXT", + CLOSE="CLOSE", + CLOSED="CLOSED", + ERROR="ERROR", + ) + ), + ) + openai = _FakeOpenAIWS([ + { + "type": "response.output_item.added", + "item_id": "item-1", + "item": { + "type": "function_call", + "call_id": "call-1", + "name": CONSULT_TOOL_NAME, + }, + }, + { + "type": "response.function_call_arguments.delta", + "call_id": "call-1", + "name": CONSULT_TOOL_NAME, + "delta": '{"query":"who is Alex?"}', + }, + { + "type": "response.function_call_arguments.done", + "call_id": "call-1", + "name": CONSULT_TOOL_NAME, + }, + ]) + state = _BridgeState() + seen = {} + + async def fake_consult(meta, query, transcript, post_call_actions, consult_results): + seen["meta"] = meta + seen["query"] = query + seen["transcript"] = transcript + seen["post_call_actions"] = post_call_actions + seen["consult_results"] = consult_results + return "Alex is in the contact book." + + async def scenario(): + await _openai_to_inkbox_pump( + openai_ws=openai, + inkbox_ws=_FakeInkboxWS(), + state=state, + config=RealtimeConfig(api_key="sk-x"), + meta=_meta(), + on_agent_consult=fake_consult, + ) + if state.consult_tasks: + await asyncio.gather(*state.consult_tasks) + + asyncio.run(scenario()) + + assert seen["meta"].call_id == "c1" + assert seen["query"] == "who is Alex?" + assert seen["post_call_actions"] == [] + assert seen["consult_results"] == [] + assert state.consult_results[0].result == "Alex is in the contact book." + item = next(frame for frame in openai.sent if frame.get("type") == "conversation.item.create") + output = json.loads(item["item"]["output"]) + assert output["status"] == "ok" + assert output["answer"] == "Alex is in the contact book." + + +def test_openai_pump_uses_frame_item_id_when_item_has_no_id(monkeypatch): + """Match Hermes: output_item.added sometimes carries item_id on the frame.""" + monkeypatch.setattr( + realtime, + "aiohttp", + types.SimpleNamespace( + WSMsgType=types.SimpleNamespace( + TEXT="TEXT", + CLOSE="CLOSE", + CLOSED="CLOSED", + ERROR="ERROR", + ) + ), + ) + openai = _FakeOpenAIWS([ + { + "type": "response.output_item.added", + "item_id": "item-2", + "item": { + "type": "function_call", + "call_id": "call-2", + "name": POST_CALL_ACTION_TOOL_NAME, + }, + }, + { + "type": "response.function_call_arguments.delta", + "item_id": "item-2", + "delta": '{"action":"email Dima the summary"}', + }, + { + "type": "response.function_call_arguments.done", + "item_id": "item-2", + "call_id": "call-2", + }, + ]) + state = _BridgeState() + + async def fake_consult(*_args): # pragma: no cover - must not run + raise AssertionError("post-call action should not consult") + + async def scenario(): + await _openai_to_inkbox_pump( + openai_ws=openai, + inkbox_ws=_FakeInkboxWS(), + state=state, + config=RealtimeConfig(api_key="sk-x"), + meta=_meta(), + on_agent_consult=fake_consult, + ) + if state.consult_tasks: + await asyncio.gather(*state.consult_tasks) + + asyncio.run(scenario()) + + assert state.post_call_actions == [{"action": "email Dima the summary", "details": ""}] diff --git a/tests/test_sessions.py b/tests/test_sessions.py index f941851..d9639c3 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -2,6 +2,7 @@ import json from pathlib import Path +from inkbox_codex import sessions as sessions_mod from inkbox_codex.config import BridgeConfig from inkbox_codex.sessions import ( ContactSession, @@ -134,6 +135,44 @@ async def scenario(): asyncio.run(scenario()) +def test_inkbox_mcp_elicitation_auto_approves_when_trusted(): + async def scenario(): + sent = [] + session = make_session(sent) + session.cfg.auto_approve_inkbox_tools = True + + result = await session._handle_codex_request( + "mcpServer/elicitation/request", + {"message": 'Allow the inkbox MCP server to run tool "inkbox_send_email"?'}, + ) + + assert result == {"action": "accept", "content": {"text": "yes"}} + assert sent == [] + + asyncio.run(scenario()) + + +def test_non_inkbox_mcp_elicitation_still_prompts(): + async def scenario(): + sent = [] + session = make_session(sent) + session.cfg.auto_approve_inkbox_tools = True + + task = asyncio.create_task( + session._handle_codex_request( + "mcpServer/elicitation/request", + {"message": 'Allow the github MCP server to run tool "create_issue"?'}, + ) + ) + await asyncio.sleep(0.05) + assert sent and "github MCP server" in sent[0][1] + + await session.handle_inbound("yes", "sms", {"conversation_id": "c1"}) + assert await task == {"action": "accept", "content": {"text": "yes"}} + + asyncio.run(scenario()) + + def test_escalation_timeout_returns_none(): async def scenario(): sent = [] @@ -185,6 +224,37 @@ async def scenario(): asyncio.run(scenario()) +def test_typing_loop_skips_reaction_policy_without_visible_reply(): + async def scenario(): + typing = [] + session = make_session([], typing) + session.mode = "imessage" + session.reply_meta = {"conversation_id": "c1", "typing": False} + + await session._typing_loop() + + assert typing == [] + + asyncio.run(scenario()) + + +def test_typing_loop_stops_at_safety_cap(monkeypatch): + monkeypatch.setattr(sessions_mod, "TYPING_REFRESH_SECONDS", 0.01) + monkeypatch.setattr(sessions_mod, "TYPING_MAX_SECONDS", 0.025) + + async def scenario(): + typing = [] + session = make_session([], typing) + session.mode = "imessage" + session.reply_meta = {"conversation_id": "c1"} + + await asyncio.wait_for(session._typing_loop(), timeout=0.2) + + assert len(typing) == 3 + + asyncio.run(scenario()) + + def test_clear_command_starts_fresh_session(): async def scenario(): sent = [] diff --git a/tests/test_setup_wizard.py b/tests/test_setup_wizard.py index 4175cab..24adfbb 100644 --- a/tests/test_setup_wizard.py +++ b/tests/test_setup_wizard.py @@ -10,6 +10,11 @@ # ---------------------------------------------------------------------- +def test_avatar_base_url_defaults_to_public_api(): + assert setup_wizard._avatar_base_url("") == "https://inkbox.ai" + assert setup_wizard._avatar_base_url("https://proxy.example/") == "https://proxy.example" + + def test_show_qr_renders_block_chars(): # segno is a declared dependency, so a QR should render to the terminal. import io @@ -118,6 +123,190 @@ def fail_import(): assert "inkbox>=0.4.10" in out +# ---------------------------------------------------------------------- +# API key scope handling +# ---------------------------------------------------------------------- + + +def test_api_key_flow_rejects_unknown_auth_subtype(monkeypatch, capsys): + class FakeWhoamiApiKeyResponse: + auth_subtype = "future_scope" + organization_id = "org_123" + + class FakeInkbox: + def __init__(self, **_kwargs): + pass + + def whoami(self): + return FakeWhoamiApiKeyResponse() + + def list_identities(self): + raise AssertionError("unknown subtypes must not fall back to identity listing") + + monkeypatch.setattr(setup_wizard, "prompt", lambda *_args, **_kwargs: "ApiKey_test") + + result = setup_wizard._api_key_flow( + "https://inkbox.ai", + FakeInkbox, + Exception, + FakeWhoamiApiKeyResponse, + "admin_scoped", + "agent_scoped_claimed", + "agent_scoped_unclaimed", + object, + ) + + assert result == (None, "", False) + assert "Unsupported API-key subtype" in capsys.readouterr().out + + +def test_admin_api_key_flow_selects_existing_identity_and_mints_agent_key(monkeypatch): + class FakeWhoamiApiKeyResponse: + auth_subtype = "admin_scoped" + organization_id = "org_123" + + class FakeApiKeys: + def __init__(self): + self.created = [] + + def create(self, **kwargs): + self.created.append(kwargs) + return types.SimpleNamespace(api_key="ApiKey_agent_selected") + + class FakeInkbox: + instance = None + + def __init__(self, **_kwargs): + self.api_keys = FakeApiKeys() + self.phone_numbers = types.SimpleNamespace() + self.identities = [ + types.SimpleNamespace(agent_handle="first-agent", email_address=None), + types.SimpleNamespace(agent_handle="selected-agent", email_address=None), + ] + self.details = { + "first-agent": types.SimpleNamespace( + id="identity-1", + agent_handle="first-agent", + email_address="first@example.com", + phone_number=types.SimpleNamespace(number="+15550000001", type="local"), + ), + "selected-agent": types.SimpleNamespace( + id="identity-2", + agent_handle="selected-agent", + email_address="selected@example.com", + phone_number=types.SimpleNamespace(number="+15550000002", type="local"), + ), + } + FakeInkbox.instance = self + + def whoami(self): + return FakeWhoamiApiKeyResponse() + + def list_identities(self): + return self.identities + + def get_identity(self, handle): + return self.details[handle] + + monkeypatch.setattr(setup_wizard, "prompt", lambda *_args, **_kwargs: "ApiKey_admin") + monkeypatch.setattr(setup_wizard, "prompt_choice", lambda *_args, **_kwargs: 1) + + identity, agent_key, did_provision_phone = setup_wizard._api_key_flow( + "https://inkbox.ai", + FakeInkbox, + Exception, + FakeWhoamiApiKeyResponse, + "admin_scoped", + "agent_scoped_claimed", + "agent_scoped_unclaimed", + object, + ) + + assert identity.agent_handle == "selected-agent" + assert agent_key == "ApiKey_agent_selected" + assert did_provision_phone is False + assert FakeInkbox.instance.api_keys.created == [ + { + "label": "Codex bridge - selected-agent", + "description": ( + "Auto-minted by inkbox-codex setup. Scoped to one agent " + "identity so the bridge never stores the admin key." + ), + "scoped_identity_id": "identity-2", + } + ] + + +def test_admin_api_key_flow_can_create_identity_and_mint_agent_key(monkeypatch): + class FakeWhoamiApiKeyResponse: + auth_subtype = "admin_scoped" + organization_id = "org_123" + + class FakeApiKeys: + def __init__(self): + self.created = [] + + def create(self, **kwargs): + self.created.append(kwargs) + return types.SimpleNamespace(api_key="ApiKey_agent_new") + + class FakeInkbox: + instance = None + + def __init__(self, **_kwargs): + self.api_keys = FakeApiKeys() + self.phone_numbers = types.SimpleNamespace() + self.created_identities = [] + FakeInkbox.instance = self + + def whoami(self): + return FakeWhoamiApiKeyResponse() + + def list_identities(self): + return [] + + def create_identity(self, handle, **kwargs): + self.created_identities.append((handle, kwargs)) + return types.SimpleNamespace( + id="identity-new", + agent_handle=handle, + email_address=f"{handle}@example.com", + phone_number=None, + ) + + answers = iter(["ApiKey_admin", "new-agent", "New Agent"]) + monkeypatch.setattr(setup_wizard, "prompt", lambda *_args, **_kwargs: next(answers)) + monkeypatch.setattr(setup_wizard, "prompt_yes_no", lambda *_args, **_kwargs: False) + + identity, agent_key, did_provision_phone = setup_wizard._api_key_flow( + "https://inkbox.ai", + FakeInkbox, + Exception, + FakeWhoamiApiKeyResponse, + "admin_scoped", + "agent_scoped_claimed", + "agent_scoped_unclaimed", + object, + ) + + assert identity.agent_handle == "new-agent" + assert agent_key == "ApiKey_agent_new" + assert did_provision_phone is False + assert FakeInkbox.instance.created_identities == [ + ("new-agent", {"display_name": "New Agent", "phone_number": None}) + ] + assert FakeInkbox.instance.api_keys.created == [ + { + "label": "Codex bridge - new-agent", + "description": ( + "Auto-minted by inkbox-codex setup. Scoped to one agent " + "identity so the bridge never stores the admin key." + ), + "scoped_identity_id": "identity-new", + } + ] + + # ---------------------------------------------------------------------- # Project directory # ---------------------------------------------------------------------- @@ -134,6 +323,17 @@ def test_configure_project_dir_persists_choice(tmp_path, monkeypatch): assert setup_wizard._env("CODEX_PROJECT_DIR") == str(tmp_path) +def test_configure_inkbox_tool_approvals_persists_choice(tmp_path, monkeypatch): + env_file = tmp_path / ".env" + monkeypatch.setenv("INKBOX_CODEX_ENV_FILE", str(env_file)) + monkeypatch.delenv("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS", raising=False) + monkeypatch.setattr(setup_wizard, "prompt_yes_no", lambda *_a, **_k: True) + + setup_wizard._configure_inkbox_tool_approvals() + + assert setup_wizard._env("INKBOX_CODEX_AUTO_APPROVE_INKBOX_TOOLS") == "true" + + # ---------------------------------------------------------------------- # Signing key # ---------------------------------------------------------------------- diff --git a/tests/test_tools.py b/tests/test_tools.py index e1b7c77..cb90371 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -49,6 +49,8 @@ def __init__(self): self.place_call_kwargs = None self.list_calls_kwargs = None self.transcript_call_id = None + self.sent_texts = [] + self.sent_imessages = [] def place_call(self, **kwargs): self.place_call_kwargs = kwargs @@ -65,10 +67,30 @@ def list_transcripts(self, call_id): _FakeTranscript("local", "sure, it's green", 2), ] + def send_imessage(self, **kwargs): + self.sent_imessages.append(kwargs) + return type("Message", (), {"id": "im-1"})() + + def send_text(self, **kwargs): + self.sent_texts.append(kwargs) + return type("Message", (), {"id": "sms-1"})() + + +class _FakeContacts: + def __init__(self): + self.deleted = [] + + def get(self, contact_id): + return {"id": contact_id, "given_name": "Ada"} + + def delete(self, contact_id): + self.deleted.append(contact_id) + class _FakeClient: def __init__(self): self.identity = _FakeIdentity() + self.contacts = _FakeContacts() def get_identity(self, _handle): return self.identity @@ -89,6 +111,42 @@ def test_call_tools_are_registered(): assert "inkbox_get_call_transcript" in names +def test_coding_agent_tool_tier_is_registered(): + names = {tool["name"] for tool in tools_mod.mcp_tool_list()} + expected = { + "inkbox_whoami", + "inkbox_send_email", + "inkbox_send_sms", + "inkbox_send_imessage", + "inkbox_place_call", + "inkbox_list_calls", + "inkbox_get_call_transcript", + "inkbox_list_text_conversations", + "inkbox_get_text_conversation", + "inkbox_list_imessage_conversations", + "inkbox_get_imessage_conversation", + "inkbox_lookup_contact", + "inkbox_list_contacts", + "inkbox_get_contact", + "inkbox_create_contact", + "inkbox_update_contact", + "inkbox_delete_contact", + } + + assert names == expected + + +def test_get_and_delete_contact_tools(): + client = _FakeClient() + + contact = _call(client, "inkbox_get_contact", {"contact_id": "contact-1"}) + deleted = _call(client, "inkbox_delete_contact", {"contact_id": "contact-1"}) + + assert contact["id"] == "contact-1" + assert deleted["deleted"] == "contact-1" + assert client.contacts.deleted == ["contact-1"] + + def test_place_call_writes_context_and_tags_websocket_url(tmp_path, monkeypatch): monkeypatch.setenv("INKBOX_CODEX_HOME", str(tmp_path)) client = _FakeClient() @@ -153,3 +211,35 @@ def test_get_call_transcript_requires_call_id(): data = _call(_FakeClient(), "inkbox_get_call_transcript", {"call_id": " "}) assert "call_id is required" in data["error"] + + +def test_send_sms_rejects_text_over_limit(): + client = _FakeClient() + data = _call( + client, + "inkbox_send_sms", + { + "to": "+15551112222", + "text": "x" * (tools_mod.SMS_MAX_LENGTH + 1), + }, + ) + + assert data["error_code"] == "sms_too_long" + assert data["char_count"] == tools_mod.SMS_MAX_LENGTH + 1 + assert client.identity.sent_texts == [] + + +def test_send_imessage_rejects_text_over_limit(): + client = _FakeClient() + data = _call( + client, + "inkbox_send_imessage", + { + "conversation_id": "imconv-123", + "text": "x" * (tools_mod.IMESSAGE_MAX_LENGTH + 1), + }, + ) + + assert data["error_code"] == "imessage_too_long" + assert data["char_count"] == tools_mod.IMESSAGE_MAX_LENGTH + 1 + assert client.identity.sent_imessages == []