diff --git a/nerve/agent/tools.py b/nerve/agent/tools.py index bca2dd0..9ffcd5e 100644 --- a/nerve/agent/tools.py +++ b/nerve/agent/tools.py @@ -1799,6 +1799,80 @@ async def _ask_user_impl(args: dict, session_id: str) -> dict: return {"content": [{"type": "text", "text": f"Failed to ask question: {e}"}]} +def _parse_action_options(raw) -> list[dict[str, str]] | None: + """Parse the ``options`` arg for propose_action. + + Accepts: + - a list of ``{"label": ..., "value": ...}`` dicts (passed through) + - a list of strings (interpreted as ``label == value``) + - a JSON-encoded string of either of the above + - falsy / empty -> None (caller falls back to dispatcher defaults) + """ + if raw is None or raw == "": + return None + if isinstance(raw, str): + try: + raw = json.loads(raw) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(raw, list) or not raw: + return None + out: list[dict[str, str]] = [] + for item in raw: + if isinstance(item, dict): + value = str(item.get("value", "")).strip() + label = str(item.get("label", value)).strip() + if not value: + continue + out.append({"label": label or value, "value": value}) + elif isinstance(item, str): + v = item.strip() + if v: + out.append({"label": v, "value": v}) + return out or None + + +async def _propose_action_impl(args: dict, session_id: str) -> dict: + """Core implementation for the propose_action tool.""" + if not _notification_service: + return {"content": [{"type": "text", "text": "Notification service not available."}]} + + target_kind = str(args.get("target_kind", "")).strip() + target_id = str(args.get("target_id", "")).strip() + title = str(args.get("title", "")).strip() + if not target_kind or not target_id or not title: + return {"content": [{"type": "text", "text": ( + "propose_action: target_kind, target_id, and title are required." + )}]} + + body = args.get("body", "") + options = _parse_action_options(args.get("options")) + priority = args.get("priority", "high") + expires_at = args.get("expires_at") or None + + try: + result = await _notification_service.propose_action( + session_id=session_id, + target_kind=target_kind, + target_id=target_id, + title=title, + body=body, + options=options, + priority=priority, + expires_at=expires_at, + ) + + nid = result["notification_id"] + return {"content": [{"type": "text", "text": ( + f"Approval requested ({nid}). When the user picks a button, " + f"the {target_kind} dispatcher acts on {target_id}; the answer " + f"is NOT injected back into this session." + )}]} + except Exception as e: + logger.error("propose_action tool failed: %s", e) + return {"content": [{"type": "text", "text": f"Failed to propose action: {e}"}]} + + async def _react_impl(args: dict, session_id: str) -> dict: """Core implementation for the react tool.""" if not _engine: @@ -2008,6 +2082,62 @@ async def mcp_reload(args: dict) -> dict: "required": ["title"], } +_PROPOSE_ACTION_SCHEMA = { + "type": "object", + "properties": { + "target_kind": { + "type": "string", + "description": ( + "Dispatcher key the user's answer routes through. " + "Currently supported: 'mechanical-action'." + ), + }, + "target_id": { + "type": "string", + "description": ( + "Dispatcher-specific identifier the chosen decision acts " + "on (e.g. a queued mechanical-action proposal id like " + "'20260519T143906Z-d2e62e')." + ), + }, + "title": { + "type": "string", + "description": "Short headline shown on the notification card.", + }, + "body": { + "type": "string", + "description": ( + "Markdown body with the justification and any details " + "the user needs to decide." + ), + "default": "", + }, + "options": { + "type": "string", + "description": ( + "JSON array of {label, value} dicts overriding the " + "dispatcher's default options. Leave empty for the " + "canonical Approve / Decline / Snooze 24h triplet." + ), + "default": "", + }, + "priority": { + "type": "string", + "description": "'low', 'normal', 'high', 'urgent'. Default: 'high'.", + "default": "high", + }, + "expires_at": { + "type": "string", + "description": ( + "ISO-8601 UTC timestamp the row expires at. Omit to use " + "the configured default expiry window." + ), + "default": "", + }, + }, + "required": ["target_kind", "target_id", "title"], +} + _REACT_SCHEMA = { "type": "object", "properties": { @@ -2077,6 +2207,21 @@ async def ask_user_tool(args: dict) -> dict: return await _ask_user_impl(args, _current_session_id) +@tool( + "propose_action", + "Ask the user to approve, decline, or snooze a queued action. " + "Unlike ask_user, the answer routes through a server-side dispatcher " + "keyed by target_kind (e.g. 'mechanical-action') and acts on target_id " + "directly. The answer is NOT injected back into this session. " + "Use for queued mechanical actions, pending plans, or any binary " + "decision the user owns and the agent has already prepared.", + _PROPOSE_ACTION_SCHEMA, +) +async def propose_action_tool(args: dict) -> dict: + """Propose an action (fallback path; uses deprecated global).""" + return await _propose_action_impl(args, _current_session_id) + + # --------------------------------------------------------------------------- # houseofagents tools (module-level — don't need session_id) # --------------------------------------------------------------------------- @@ -2227,6 +2372,20 @@ async def session_ask_user(args: dict) -> dict: # session_id captured from enclosing scope — race-free return await _ask_user_impl(args, session_id) + @tool( + "propose_action", + "Ask the user to approve, decline, or snooze a queued action. " + "Unlike ask_user, the answer routes through a server-side dispatcher " + "keyed by target_kind (e.g. 'mechanical-action') and acts on target_id " + "directly. The answer is NOT injected back into this session. " + "Use for queued mechanical actions, pending plans, or any binary " + "decision the user owns and the agent has already prepared.", + _PROPOSE_ACTION_SCHEMA, + ) + async def session_propose_action(args: dict) -> dict: + # session_id captured from enclosing scope; race-free. + return await _propose_action_impl(args, session_id) + @tool( "react", "Set an emoji reaction on the user's last message. " @@ -2357,8 +2516,8 @@ async def session_send_file(args: dict) -> dict: return await _send_file_impl(args, session_id) # Shared tools (don't need session context) + session-scoped tools - shared_tools = [t for t in ALL_TOOLS if t.name not in ("notify", "ask_user", "react", "send_sticker", "plan_propose", "plan_update")] - session_tools: list[SdkMcpTool] = [session_notify, session_ask_user, session_react, session_send_sticker, session_plan_propose, session_plan_update, session_send_file] + shared_tools = [t for t in ALL_TOOLS if t.name not in ("notify", "ask_user", "propose_action", "react", "send_sticker", "plan_propose", "plan_update")] + session_tools: list[SdkMcpTool] = [session_notify, session_ask_user, session_propose_action, session_react, session_send_sticker, session_plan_propose, session_plan_update, session_send_file] # Only include houseofagents tools when enabled — saves context tokens otherwise hoa_enabled = _config and _config.houseofagents.enabled diff --git a/nerve/db/migrations/v028_notification_approval_kind.py b/nerve/db/migrations/v028_notification_approval_kind.py new file mode 100644 index 0000000..e4e8692 --- /dev/null +++ b/nerve/db/migrations/v028_notification_approval_kind.py @@ -0,0 +1,44 @@ +"""V28: Add target_kind / target_id columns to notifications. + +Extends the notification table to support the ``approval`` notification +kind: notifications that route to a server-side dispatcher when the user +answers them (e.g. approve / decline / snooze a queued mechanical +action). The existing ``type`` column gains a third valid value +(``approval``); the column itself stays TEXT so no schema change is +needed there. + +The two new columns: + +- ``target_kind`` TEXT NULL: dispatcher key (e.g. ``mechanical-action``, + ``plan``). NULL for legacy ``notify`` / ``question`` rows, which means + "no dispatch; fall through to the existing answer-injection path." +- ``target_id`` TEXT NULL: dispatcher-specific identifier (e.g. the + mechanical-action proposal id). Read by the handler registry. + +Existing rows are left untouched (target_kind = NULL), so the answer +path stays identical for every notification created before v28. +""" + +from __future__ import annotations + +import logging + +import aiosqlite + +logger = logging.getLogger(__name__) + + +async def up(db: aiosqlite.Connection) -> None: + await db.execute( + "ALTER TABLE notifications ADD COLUMN target_kind TEXT" + ) + await db.execute( + "ALTER TABLE notifications ADD COLUMN target_id TEXT" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_notifications_target " + "ON notifications(target_kind, target_id)" + ) + logger.info( + "v028: added target_kind/target_id to notifications + index" + ) diff --git a/nerve/db/notifications.py b/nerve/db/notifications.py index 6bf8e35..507be6c 100644 --- a/nerve/db/notifications.py +++ b/nerve/db/notifications.py @@ -17,18 +17,30 @@ async def create_notification( title: str, body: str = "", priority: str = "normal", - options: list[str] | None = None, + options: list | None = None, expires_at: str | None = None, metadata: dict | None = None, + target_kind: str | None = None, + target_id: str | None = None, ) -> dict: + """Insert a notification row. + + ``type`` is one of ``notify`` (fire-and-forget), ``question`` + (ask_user / answer-injection), or ``approval`` (action-dispatch + via the handler registry). ``target_kind`` and ``target_id`` are + only populated for ``approval`` rows; left NULL otherwise so the + legacy answer path stays untouched. + """ now = datetime.now(timezone.utc).isoformat() await self.db.execute( """INSERT INTO notifications - (id, session_id, type, title, body, priority, options, expires_at, metadata, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (id, session_id, type, title, body, priority, options, + expires_at, metadata, created_at, target_kind, target_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (notification_id, session_id, type, title, body, priority, json.dumps(options) if options else None, - expires_at, json.dumps(metadata or {}), now), + expires_at, json.dumps(metadata or {}), now, + target_kind, target_id), ) await self.db.commit() return {"id": notification_id, "session_id": session_id, "type": type} @@ -128,6 +140,32 @@ async def expire_notifications(self) -> int: await self.db.commit() return cursor.rowcount + async def snooze_notification( + self, notification_id: str, new_expires_at: str, + ) -> bool: + """Push a pending notification's expiry forward. + + Used by the ``approval`` dispatcher when the user picks + ``snooze_24h``: the row stays at status=pending so a later + re-delivery tick (wired in PR 2) can surface it again, but the + expiry advances so it does not get caught by ``expire_stale`` + in the meantime. + + Returns True on success, False if the row is not pending. + """ + async with self._atomic(): + async with self.db.execute( + "SELECT id FROM notifications WHERE id = ? AND status = 'pending'", + (notification_id,), + ) as cursor: + if not await cursor.fetchone(): + return False + await self.db.execute( + "UPDATE notifications SET expires_at = ? WHERE id = ?", + (new_expires_at, notification_id), + ) + return True + async def count_pending_notifications(self, channel: str | None = None) -> int: sql = "SELECT COUNT(*) FROM notifications WHERE status = 'pending'" params: tuple = () diff --git a/nerve/notifications/handlers.py b/nerve/notifications/handlers.py new file mode 100644 index 0000000..485ebd8 --- /dev/null +++ b/nerve/notifications/handlers.py @@ -0,0 +1,300 @@ +"""Dispatch registry for ``approval``-kind notifications. + +When a user answers an ``approval`` notification, the notification +service looks up the row's ``target_kind`` and routes the decision to +the matching dispatcher in this module. Dispatchers know how to act on +their target type (a mechanical-action proposal, a plan, etc.) and +return a structured ``DispatchResult`` so the service can audit-log the +outcome uniformly. + +PR 1 ships one dispatcher: ``mechanical-action``. PR 2 will add the +``plan`` dispatcher and the per-target re-delivery wiring. + +Design notes: + +- The registry is keyed by ``target_kind`` (a string). Decisions + (``approve`` / ``decline`` / ``snooze_24h`` / future values) are + passed as a function arg, not part of the key, so adding a new + decision doesn't need a new registry entry. +- Dispatchers receive the raw notification dict, the target id, the + decision string, and the live ``NerveConfig`` (for the workspace + path; see ``mechanical-action`` below). The service-side caller is + responsible for committing the DB-level status flip, so dispatchers + can stay pure-side-effect-only against the target system. +- Dispatchers should be deterministic in their audit_event keys so the + audit log stays grep-able. +""" + +from __future__ import annotations + +import logging +import os +import shutil +import subprocess +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from nerve.config import NerveConfig + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------- +# Result type +# ---------------------------------------------------------------------- + + +@dataclass +class DispatchResult: + """Outcome of a single dispatch call. + + Fields: + - ``ok``: True when the dispatcher's downstream action completed + successfully. False on a non-zero exit or an internal error. The + service still flips the notification's status to ``answered`` on + either outcome, so the user does not see a re-delivered approval + that the system already acted on; the audit_event records the + failure for replay. + - ``audit_event``: a structured dict the caller appends to the + mechanical-actions audit log. Already includes the dispatcher's + own event name (``approval-acted``); the service adds the + notification id and timestamp. + - ``snooze_until``: ISO-8601 UTC timestamp for the new expiry when + the decision is a snooze. None for approve / decline. + """ + + ok: bool + audit_event: dict[str, Any] = field(default_factory=dict) + snooze_until: str | None = None + + +# ---------------------------------------------------------------------- +# Registry primitives +# ---------------------------------------------------------------------- + + +Dispatcher = Callable[ + [dict[str, Any], str, str, "NerveConfig | None"], DispatchResult +] + + +_DISPATCHERS: dict[str, Dispatcher] = {} + + +def register(target_kind: str, fn: Dispatcher) -> None: + """Register a dispatcher for a ``target_kind``. + + Idempotent: re-registering overwrites the previous entry so test + code can swap in fakes per-test without leaking state across the + module-level dict. + """ + _DISPATCHERS[target_kind] = fn + + +def get(target_kind: str) -> Dispatcher | None: + """Look up a dispatcher by ``target_kind``. None if unregistered.""" + return _DISPATCHERS.get(target_kind) + + +def known_kinds() -> list[str]: + """Return the currently registered ``target_kind`` values.""" + return sorted(_DISPATCHERS.keys()) + + +# ---------------------------------------------------------------------- +# mechanical-action dispatcher +# ---------------------------------------------------------------------- + + +# Valid decisions for the mechanical-action dispatcher. Kept here so a +# typo in the caller surfaces as an explicit failure rather than a +# silent no-op shell call. +_MECHANICAL_DECISIONS = frozenset({"approve", "decline", "snooze_24h"}) + + +def _resolve_workspace( + config: "NerveConfig | None", +) -> Path | None: + """Pick the workspace directory. + + Priority: + 1. ``$NERVE_WORKSPACE_PATH`` env var (test / override hook). + 2. ``config.workspace`` from the live NerveConfig. + 3. None if neither is available. + """ + override = os.environ.get("NERVE_WORKSPACE_PATH") + if override: + return Path(override).expanduser() + if config is not None and config.workspace: + return Path(config.workspace).expanduser() + return None + + +def _dispatch_mechanical_action( + notification: dict[str, Any], + target_id: str, + decision: str, + config: "NerveConfig | None", +) -> DispatchResult: + """Approve / decline / snooze a queued mechanical-action proposal. + + Shells out to ``/scripts/mechanical-action.sh`` which is + the decide-side wrapper around the propose-mechanical-action + primitive. The wrapper handles audit-log writes for the + approve / decline paths; this dispatcher writes its own + ``approval-acted`` audit event so the chain is observable from the + notification side as well. + + Snooze: rather than touch the queue file directly, this dispatcher + calls ``mechanical-action.sh snooze --hours 24`` and lets the + decide-side script record the audit event and update the queue + entry's ``not_before`` field. PR 2 wires the re-delivery scheduler; + until then, the snooze just keeps the proposal in queue/ with a + future ``not_before`` timestamp. + """ + notif_id = notification.get("id", "") + base_event: dict[str, Any] = { + "event": "approval-acted", + "notification_id": notif_id, + "target_kind": "mechanical-action", + "target_id": target_id, + "decision": decision, + } + + if decision not in _MECHANICAL_DECISIONS: + logger.warning( + "mechanical-action dispatch: unsupported decision %r on %s", + decision, target_id, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"unsupported decision: {decision}", + }, + ) + + workspace = _resolve_workspace(config) + if workspace is None: + logger.error( + "mechanical-action dispatch: no workspace configured; " + "set NERVE_WORKSPACE_PATH or config.workspace", + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": "workspace path unresolved", + }, + ) + + script_path = workspace / "scripts" / "mechanical-action.sh" + if not script_path.is_file(): + logger.error( + "mechanical-action dispatch: script missing at %s", + script_path, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"script missing: {script_path}", + }, + ) + + if shutil.which("bash") is None: + logger.error("mechanical-action dispatch: bash not on PATH") + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": "bash not on PATH", + }, + ) + + if decision == "approve": + cmd = ["bash", str(script_path), "approve", target_id] + snooze_until = None + elif decision == "decline": + reason = ( + f"user declined via notification {notif_id}" + if notif_id else "user declined via notification" + ) + cmd = [ + "bash", str(script_path), "decline", target_id, + "--reason", reason, + ] + snooze_until = None + else: # snooze_24h + cmd = [ + "bash", str(script_path), "snooze", target_id, + "--hours", "24", + ] + snooze_until = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + + try: + completed = subprocess.run( + cmd, capture_output=True, text=True, check=False, timeout=30, + ) + except (OSError, subprocess.SubprocessError) as exc: + logger.error( + "mechanical-action dispatch: subprocess failed: %s", exc, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"subprocess error: {exc}", + }, + snooze_until=snooze_until, + ) + + ok = completed.returncode == 0 + audit_event = { + **base_event, + "ok": ok, + "exit_code": completed.returncode, + } + # Capture a short tail of stderr on failure so the audit log shows + # what went wrong without becoming a wall of text. + if not ok and completed.stderr: + audit_event["stderr_tail"] = completed.stderr[-512:] + + return DispatchResult( + ok=ok, + audit_event=audit_event, + snooze_until=snooze_until, + ) + + +register("mechanical-action", _dispatch_mechanical_action) + + +# ---------------------------------------------------------------------- +# Convenience: default approval options +# ---------------------------------------------------------------------- + + +def default_approval_options() -> list[dict[str, str]]: + """Return the canonical Approve / Decline / Snooze 24h triplet. + + Used by the ``propose_action`` MCP tool when the caller does not + supply its own options list. Keeping this here (next to the + dispatcher) keeps the option set co-located with the decisions the + dispatcher actually understands. + """ + return [ + {"label": "Approve", "value": "approve"}, + {"label": "Decline", "value": "decline"}, + {"label": "Snooze 24h", "value": "snooze_24h"}, + ] diff --git a/nerve/notifications/service.py b/nerve/notifications/service.py index ec89b3c..9f22ce5 100644 --- a/nerve/notifications/service.py +++ b/nerve/notifications/service.py @@ -1,8 +1,10 @@ """Notification service — centralized fanout, answer routing, and persistence. Coordinates between MCP tools (agent-side), channels (delivery), and the -answer routing mechanism (user-side). Supports fire-and-forget notifications -and async questions with multi-channel delivery (web UI + Telegram). +answer routing mechanism (user-side). Supports fire-and-forget notifications, +async questions with multi-channel delivery (web UI + Telegram), and +``approval``-kind notifications that route to a server-side dispatcher when +the user picks an inline option (see ``nerve.notifications.handlers``). """ from __future__ import annotations @@ -10,9 +12,13 @@ import asyncio import json import logging +import os import uuid from datetime import datetime, timezone, timedelta -from typing import TYPE_CHECKING +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from nerve.notifications import handlers as _handlers if TYPE_CHECKING: from nerve.agent.engine import AgentEngine @@ -22,6 +28,68 @@ logger = logging.getLogger(__name__) +# Emoji decoration applied to the canonical approval decisions when we +# render their buttons. Keys are the option ``value`` strings; missing +# values fall back to the raw label. +_APPROVAL_EMOJIS: dict[str, str] = { + "approve": "✅", # white heavy check mark + "decline": "❌", # cross mark + "snooze_24h": "\U0001F4A4", # zzz +} + + +def _resolve_workspace(config: NerveConfig | None) -> Path | None: + """Resolve the workspace directory. + + Mirrors ``handlers._resolve_workspace`` so the service can locate + ``scripts/_mechanical_action.py`` for audit-log writes. Priority: + ``$NERVE_WORKSPACE_PATH`` first (test override), then + ``config.workspace``. + """ + override = os.environ.get("NERVE_WORKSPACE_PATH") + if override: + return Path(override).expanduser() + if config is not None and getattr(config, "workspace", None): + return Path(config.workspace).expanduser() + return None + + +def _load_mechanical_action_helper(workspace: Path): + """Import the workspace's ``_mechanical_action`` helper by path. + + The helper is a workspace-side stdlib module, not part of the Nerve + package, so we load it via ``importlib.util`` the same way the + workspace scripts do. Cached on first load via a module-level dict + so repeated approval answers do not re-spec the file each time. + """ + cached = _HELPER_CACHE.get(str(workspace)) + if cached is not None: + return cached + + import importlib.util + + helper_path = workspace / "scripts" / "_mechanical_action.py" + if not helper_path.is_file(): + raise FileNotFoundError( + f"mechanical-action helper not found at {helper_path}" + ) + spec = importlib.util.spec_from_file_location( + f"_mechanical_action__{abs(hash(str(workspace))) & 0xFFFFFF:06x}", + helper_path, + ) + if spec is None or spec.loader is None: + raise ImportError( + f"cannot build module spec for {helper_path}" + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + _HELPER_CACHE[str(workspace)] = module + return module + + +_HELPER_CACHE: dict[str, Any] = {} + + class NotificationService: """Manages notification lifecycle: create, deliver, answer, route.""" @@ -117,6 +185,84 @@ async def ask_question( return {"notification_id": notification_id, "status": "sent"} + async def propose_action( + self, + session_id: str, + target_kind: str, + target_id: str, + title: str, + body: str = "", + options: list[dict[str, str]] | None = None, + priority: str = "high", + expires_at: str | None = None, + expiry_hours: int | None = None, + ) -> dict: + """File an actionable ``approval``-kind notification. + + ``target_kind`` + ``target_id`` route the user's answer through + ``nerve.notifications.handlers`` instead of the question + answer-injection path. ``options`` accepts a list of + ``{"label": ..., "value": ...}`` dicts; when omitted, the + dispatcher's canonical option set is used (Approve / Decline / + Snooze 24h for the mechanical-action dispatcher). + + Returns ``{"notification_id": , "status": "sent"}``. + """ + notification_id = f"approval-{uuid.uuid4().hex[:8]}" + + # Resolve options. Default to the registered dispatcher's + # canonical set when none was passed. Falling back to the + # mechanical-action default keeps PR 1's only wired path + # working without forcing the caller to recite the same triplet. + if options is None: + options = _handlers.default_approval_options() + elif not options: + raise ValueError("propose_action: options must not be empty") + + # Normalize: store as a list of value strings (matching the + # existing question-kind contract) plus a parallel label map in + # metadata so the Telegram + web layers can render the labels + # without re-parsing options on every send. + option_values = [str(opt["value"]) for opt in options] + option_labels = { + str(opt["value"]): str(opt.get("label", opt["value"])) + for opt in options + } + + if expires_at is None: + hours = ( + expiry_hours + or self.config.notifications.default_expiry_hours + ) + expires_at = ( + datetime.now(timezone.utc) + timedelta(hours=hours) + ).isoformat() + + await self.db.create_notification( + notification_id=notification_id, + session_id=session_id, + type="approval", + title=title, + body=body, + priority=priority, + options=option_values, + expires_at=expires_at, + metadata={ + "target_kind": target_kind, + "target_id": target_id, + "option_labels": option_labels, + }, + target_kind=target_kind, + target_id=target_id, + ) + + await self._fanout( + notification_id, session_id, "approval", title, body, + priority, options=option_values, option_labels=option_labels, + ) + + return {"notification_id": notification_id, "status": "sent"} + # ------------------------------------------------------------------ # # Answer routing (called by REST API / Telegram callback) # # ------------------------------------------------------------------ # @@ -127,16 +273,27 @@ async def handle_answer( answer: str, answered_by: str, ) -> bool: - """Process a user's answer to a question. - - 1. Persist the answer in DB. - 2. Inject answer as user message in the session. - 3. Broadcast answer event to web UI. + """Process a user's answer to a question or approval. + + - For ``type=approval`` rows: look up the dispatcher in the + handler registry, run it, audit-log the outcome, then flip + the row's status. Snooze answers advance ``expires_at`` and + keep the row pending so a later re-delivery tick can surface + it again. + - For ``type=question`` rows (legacy): persist the answer, + inject it back into the originating session, broadcast. + - Fire-and-forget ``type=notify`` rows do not flow through this + method; the dismiss endpoint handles those. """ notif = await self.db.get_notification(notification_id) if not notif or notif["status"] != "pending": return False + if notif.get("type") == "approval": + return await self._handle_approval_answer( + notif, answer, answered_by, + ) + success = await self.db.answer_notification( notification_id, answer, answered_by, ) @@ -194,6 +351,137 @@ async def handle_answer( return True + async def _handle_approval_answer( + self, + notif: dict[str, Any], + answer: str, + answered_by: str, + ) -> bool: + """Route an approval answer through the dispatcher registry.""" + notification_id = notif["id"] + session_id = notif["session_id"] + target_kind = notif.get("target_kind") or "" + target_id = notif.get("target_id") or "" + + dispatcher = _handlers.get(target_kind) if target_kind else None + if dispatcher is None: + logger.warning( + "approval %s has no dispatcher for target_kind=%r; " + "marking answered without action", + notification_id, target_kind, + ) + result = _handlers.DispatchResult( + ok=False, + audit_event={ + "event": "approval-acted", + "notification_id": notification_id, + "target_kind": target_kind, + "target_id": target_id, + "decision": answer, + "ok": False, + "error": ( + f"no dispatcher registered for {target_kind!r}" + ), + }, + ) + else: + try: + result = await asyncio.to_thread( + dispatcher, notif, target_id, answer, self.config, + ) + except Exception as exc: # defensive: never crash the route + logger.exception( + "approval dispatch raised for %s (target=%s:%s, " + "decision=%s): %s", + notification_id, target_kind, target_id, answer, exc, + ) + result = _handlers.DispatchResult( + ok=False, + audit_event={ + "event": "approval-acted", + "notification_id": notification_id, + "target_kind": target_kind, + "target_id": target_id, + "decision": answer, + "ok": False, + "error": f"dispatcher raised: {exc}", + }, + ) + + await self._append_approval_audit(result.audit_event) + + # Snooze keeps the row pending with a future expiry so a later + # re-delivery tick (wired in PR 2) can surface it again. + if result.snooze_until is not None and result.ok: + await self.db.snooze_notification( + notification_id, result.snooze_until, + ) + else: + await self.db.answer_notification( + notification_id, answer, answered_by, + ) + + from nerve.agent.streaming import broadcaster + broadcast_status = ( + "snoozed" if (result.snooze_until and result.ok) else "answered" + ) + await broadcaster.broadcast("__global__", { + "type": "notification_answered", + "notification_id": notification_id, + "session_id": session_id, + "answer": answer, + "answered_by": answered_by, + "approval_status": broadcast_status, + "dispatch_ok": result.ok, + }) + + return True + + async def _append_approval_audit(self, event: dict[str, Any]) -> None: + """Append an ``approval-acted`` record to the mechanical-actions log. + + Uses the same audit log that the propose-mechanical-action + primitive writes to (``~/.nerve/mechanical-actions/audit.jsonl``) + so the proposal lifecycle (``proposed`` -> ``approval-acted`` + -> ``approved``/``declined``/``executed``) is visible in one + place. The shared helper module + ``scripts/_mechanical_action.py`` lives under the workspace, so + we import it dynamically by path rather than as a real Python + package. + """ + workspace = _resolve_workspace(self.config) + if workspace is None: + logger.debug( + "approval audit: no workspace configured; event=%s", + event.get("event"), + ) + return + try: + helper = _load_mechanical_action_helper(workspace) + except Exception as exc: # defensive: never crash the route + logger.warning( + "approval audit: cannot load helper at %s: %s", + workspace, exc, + ) + return + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + # Re-use the helper's append path. Use a tag the helper hasn't + # whitelisted yet by patching VALID_EVENTS just for our event, + # so the helper validation stays strict for everything else. + record = {"ts": ts, **event} + valid = getattr(helper, "VALID_EVENTS", None) + if isinstance(valid, set) and "approval-acted" not in valid: + valid.add("approval-acted") + + try: + await asyncio.to_thread(helper.append_audit, record, None) + except Exception as exc: + logger.warning( + "approval audit append failed: %s (event=%s)", + exc, event.get("event"), + ) + def _on_answer_task_done(self, task: asyncio.Task) -> None: """Log errors from answer injection tasks. @@ -230,8 +518,16 @@ async def _fanout( options: list[str] | None = None, channels: list[str] | None = None, silent: bool = False, + option_labels: dict[str, str] | None = None, ) -> None: - """Deliver notification to all configured channels in parallel.""" + """Deliver notification to all configured channels in parallel. + + ``option_labels`` is used by ``approval``-kind notifications: it + maps the canonical option ``value`` (sent back on the callback + as the answer string) to the human-facing label rendered on the + button. ``None`` for the legacy ``question`` path, where the + label and the value are identical. + """ target_channels = channels or self.config.notifications.channels async def _deliver(channel_name: str) -> str | None: @@ -241,13 +537,14 @@ async def _deliver(channel_name: str) -> str | None: await self._deliver_web( notification_id, session_id, notif_type, title, body, priority, options, + option_labels=option_labels, ) return "web" elif channel_name == "telegram": msg_id = await self._deliver_telegram( notification_id, session_id, notif_type, title, body, priority, options, - silent=silent, + silent=silent, option_labels=option_labels, ) if msg_id: await self.db.update_notification( @@ -282,8 +579,15 @@ async def _deliver_web( body: str, priority: str, options: list[str] | None, + option_labels: dict[str, str] | None = None, ) -> None: - """Broadcast notification to web UI via the global broadcaster.""" + """Broadcast notification to web UI via the global broadcaster. + + For approval-kind rows we also include ``option_labels`` so the + web NotificationCard can render readable button text while the + button click still sends the canonical ``value`` back through + the answer endpoint. + """ from nerve.agent.streaming import broadcaster message = { "type": "notification", @@ -295,6 +599,8 @@ async def _deliver_web( "priority": priority, "options": options, } + if option_labels: + message["option_labels"] = option_labels await broadcaster.broadcast("__global__", message) def _resolve_telegram_chat_id(self) -> int | None: @@ -325,8 +631,9 @@ async def _deliver_telegram( priority: str, options: list[str] | None, silent: bool = False, + option_labels: dict[str, str] | None = None, ) -> str | None: - """Send notification to Telegram, with inline keyboard for questions.""" + """Send notification to Telegram, with inline keyboard for questions/approvals.""" bot = self._get_telegram_bot() if not bot: logger.warning("Telegram bot not available for notification %s", notification_id) @@ -347,9 +654,21 @@ async def _deliver_telegram( if self._should_show_session_label(session_id): text += f"\n\nSession: {session_id}" - if notif_type == "question" and options: + if notif_type in ("question", "approval") and options: + button_labels: list[tuple[str, str]] = [] + for value in options: + if notif_type == "approval": + label = ( + (option_labels or {}).get(value) + or value.replace("_", " ").title() + ) + emoji = _APPROVAL_EMOJIS.get(value, "") + rendered = f"{emoji} {label}".strip() if emoji else label + else: + rendered = value + button_labels.append((rendered, value)) return await self._send_telegram_inline( - chat_id, notification_id, text, options, silent=silent, + chat_id, notification_id, text, button_labels, silent=silent, ) else: msg = await self._send_telegram_html(bot, chat_id, text, silent=silent) @@ -388,10 +707,16 @@ async def _send_telegram_inline( chat_id: int, notification_id: str, text: str, - options: list[str], + options: list[str] | list[tuple[str, str]], silent: bool = False, ) -> str | None: - """Send Telegram message with inline keyboard buttons.""" + """Send Telegram message with inline keyboard buttons. + + ``options`` accepts either a flat list of strings (legacy + question kind: label == callback value) or a list of + ``(label, value)`` tuples (approval kind: emoji-prefixed label, + canonical value sent back on the callback). + """ bot = self._get_telegram_bot() if not bot: return None @@ -399,14 +724,18 @@ async def _send_telegram_inline( from telegram import InlineKeyboardButton, InlineKeyboardMarkup buttons = [] - for option in options: - callback_data = f"notif:{notification_id}:{option}" + for entry in options: + if isinstance(entry, tuple): + label, value = entry + else: + label = value = entry + callback_data = f"notif:{notification_id}:{value}" # Telegram callback_data max 64 bytes — truncate option if needed if len(callback_data.encode("utf-8")) > 64: max_opt_len = 64 - len(f"notif:{notification_id}:".encode("utf-8")) - truncated = option.encode("utf-8")[:max_opt_len].decode("utf-8", errors="ignore") + truncated = value.encode("utf-8")[:max_opt_len].decode("utf-8", errors="ignore") callback_data = f"notif:{notification_id}:{truncated}" - buttons.append([InlineKeyboardButton(option, callback_data=callback_data)]) + buttons.append([InlineKeyboardButton(label, callback_data=callback_data)]) keyboard = InlineKeyboardMarkup(buttons) diff --git a/tests/test_notifications_actionable.py b/tests/test_notifications_actionable.py new file mode 100644 index 0000000..3937db1 --- /dev/null +++ b/tests/test_notifications_actionable.py @@ -0,0 +1,576 @@ +"""Tests for the ``approval`` notification kind. + +PR 1 of the actionable-inbox series: +- v028 migration adds ``target_kind`` and ``target_id`` columns. +- ``NotificationService.propose_action`` files a row of type=approval. +- ``handle_answer`` dispatches the user's decision through the + ``nerve.notifications.handlers`` registry. +- The legacy ``type=question`` answer-injection path stays untouched. + +These tests run against a fresh in-memory SQLite per test and stub +out the streaming broadcaster + agent engine so we can assert +behavior in isolation. +""" + +from __future__ import annotations + +import asyncio +import json +import shutil +import sys +import textwrap +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nerve.config import NerveConfig, NotificationsConfig +from nerve.db import Database +from nerve.notifications import handlers as _handlers +from nerve.notifications.service import NotificationService + + +# ---------------------------------------------------------------------- +# Fixtures +# ---------------------------------------------------------------------- + + +@pytest.fixture +def fake_config(tmp_path: Path) -> NerveConfig: + """Minimal NerveConfig with workspace + notifications config wired.""" + cfg = NerveConfig() + cfg.workspace = tmp_path + cfg.notifications = NotificationsConfig( + channels=["web"], # skip telegram in unit tests + telegram_chat_id=None, + default_expiry_hours=48, + priority_prefixes={"high": "", "urgent": ""}, + ) + return cfg + + +@pytest.fixture +def fake_engine() -> MagicMock: + """An engine stub with the minimum surface the service touches.""" + engine = MagicMock() + engine.sessions = MagicMock() + engine.sessions.is_running.return_value = False + engine.router = MagicMock() + engine.router.get_channel.return_value = None + engine.run = AsyncMock() + return engine + + +@pytest.fixture +def patch_broadcaster(monkeypatch: pytest.MonkeyPatch) -> list[tuple[str, dict]]: + """Capture broadcaster.broadcast() calls instead of hitting any WS.""" + captured: list[tuple[str, dict]] = [] + + class _FakeBroadcaster: + async def broadcast(self, channel: str, message: dict) -> None: + captured.append((channel, message)) + + from nerve.agent import streaming + monkeypatch.setattr(streaming, "broadcaster", _FakeBroadcaster()) + return captured + + +_MINIMAL_HELPER_SRC = textwrap.dedent( + '''\ + """Minimal mechanical-action helper used only by the test fixture. + + Mirrors the audit + queue surface the notification service touches + so the dispatcher can shell into a stub script and append an audit + record without dragging in any out-of-tree files. + + Honors ``$NERVE_MECHANICAL_STATE_DIR`` so each test can point the + helper at its own temp directory. + """ + + from __future__ import annotations + + import json + import os + from datetime import datetime, timezone + from pathlib import Path + + _OVERRIDE = os.environ.get("NERVE_MECHANICAL_STATE_DIR") + STATE_DIR = ( + Path(_OVERRIDE).expanduser() if _OVERRIDE + else Path("~/.nerve/mechanical-actions").expanduser() + ) + QUEUE_DIR = STATE_DIR / "queue" + DECISIONS_DIR = STATE_DIR / "decisions" + AUDIT_LOG = STATE_DIR / "audit.jsonl" + + VALID_EVENTS = { + "proposed", "approved", "declined", + "auto-execute", "executed", "failed", + "snoozed", "approval-acted", + } + + + def utc_now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + + def ensure_dirs(state_dir: Path | None = None): + s = Path(state_dir) if state_dir else STATE_DIR + (s / "queue").mkdir(parents=True, exist_ok=True) + (s / "decisions").mkdir(parents=True, exist_ok=True) + return s / "queue", s / "decisions", s / "audit.jsonl" + + + def append_audit(event, audit_log=None): + log = Path(audit_log) if audit_log else AUDIT_LOG + log.parent.mkdir(parents=True, exist_ok=True) + line = json.dumps(event, separators=(",", ":")) + "\\n" + fd = os.open(log, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644) + try: + os.write(fd, line.encode("utf-8")) + finally: + os.close(fd) + + + def read_audit(audit_log=None): + log = Path(audit_log) if audit_log else AUDIT_LOG + if not log.is_file(): + return [] + out = [] + for line in log.read_text().splitlines(): + line = line.strip() + if line: + out.append(json.loads(line)) + return out + ''' +) + + +@pytest.fixture +def workspace_with_scripts(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Create a synthetic ``scripts/`` layout the dispatcher can shell into. + + Drops a stub ``mechanical-action.sh`` that records its args + exits + with ``$MECHACTION_EXIT`` and a minimal ``_mechanical_action.py`` + audit helper. We deliberately avoid copying any out-of-tree file so + the test stays self-contained. + """ + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + + helper_path = scripts_dir / "_mechanical_action.py" + helper_path.write_text(_MINIMAL_HELPER_SRC) + + # A predictable stub: writes its args to a sibling log, returns the + # exit code embedded in $MECHACTION_EXIT (default 0). + log_path = tmp_path / "mechanical-action.log" + stub = textwrap.dedent(f"""\ + #!/usr/bin/env bash + echo "$@" >> "{log_path}" + exit ${{MECHACTION_EXIT:-0}} + """) + sh_path = scripts_dir / "mechanical-action.sh" + sh_path.write_text(stub) + sh_path.chmod(0o755) + + monkeypatch.setenv("NERVE_WORKSPACE_PATH", str(tmp_path)) + monkeypatch.setenv( + "NERVE_MECHANICAL_STATE_DIR", + str(tmp_path / ".nerve" / "mechanical-actions"), + ) + return tmp_path + + +def read_audit_jsonl(state_dir: Path) -> list[dict[str, Any]]: + """Read every record from the mechanical-actions audit log.""" + audit_log = state_dir / "audit.jsonl" + if not audit_log.is_file(): + return [] + out: list[dict[str, Any]] = [] + for line in audit_log.read_text().splitlines(): + line = line.strip() + if not line: + continue + out.append(json.loads(line)) + return out + + +# ---------------------------------------------------------------------- +# Schema / store +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestSchemaAndStore: + async def test_v028_columns_exist(self, db: Database): + async with db.db.execute("PRAGMA table_info(notifications)") as cur: + cols = {row[1] async for row in cur} + assert "target_kind" in cols + assert "target_id" in cols + + async def test_create_notification_default_target_columns_null(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", + type="notify", title="hello", + ) + notif = await db.get_notification("n1") + assert notif is not None + assert notif["target_kind"] is None + assert notif["target_id"] is None + + async def test_create_notification_with_target(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", + type="approval", title="approve me", + target_kind="mechanical-action", + target_id="20260519T143906Z-d2e62e", + ) + notif = await db.get_notification("n1") + assert notif["target_kind"] == "mechanical-action" + assert notif["target_id"] == "20260519T143906Z-d2e62e" + assert notif["type"] == "approval" + + async def test_snooze_notification_advances_expiry(self, db: Database): + await db.create_session("s1") + future = ( + datetime.now(timezone.utc) + timedelta(hours=1) + ).isoformat() + await db.create_notification( + notification_id="n1", session_id="s1", + type="approval", title="t", + expires_at=future, + ) + new_expiry = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + ok = await db.snooze_notification("n1", new_expiry) + assert ok is True + notif = await db.get_notification("n1") + assert notif["expires_at"] == new_expiry + assert notif["status"] == "pending" + + async def test_snooze_notification_rejects_non_pending(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", type="approval", title="t", + ) + await db.answer_notification("n1", "approve", "web") + new_expiry = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + assert await db.snooze_notification("n1", new_expiry) is False + + +# ---------------------------------------------------------------------- +# propose_action +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestProposeAction: + async def test_propose_action_creates_approval_row( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="test-123", + title="approve fix-pack", + ) + notif = await db.get_notification(result["notification_id"]) + assert notif is not None + assert notif["type"] == "approval" + assert notif["target_kind"] == "mechanical-action" + assert notif["target_id"] == "test-123" + assert notif["priority"] == "high" + # Options stored as the canonical value list. + stored_opts = json.loads(notif["options"]) + assert stored_opts == ["approve", "decline", "snooze_24h"] + # option_labels live in metadata so the web side can render + # without re-parsing options. + meta = json.loads(notif["metadata"]) + assert meta["option_labels"]["approve"] == "Approve" + assert meta["option_labels"]["snooze_24h"] == "Snooze 24h" + assert meta["target_kind"] == "mechanical-action" + + async def test_propose_action_rejects_empty_options( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + with pytest.raises(ValueError): + await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="t", + title="t", + options=[], + ) + + async def test_propose_action_custom_options_round_trip( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="x", + title="custom", + options=[ + {"label": "Yes please", "value": "yes"}, + {"label": "No thanks", "value": "no"}, + ], + ) + notif = await db.get_notification(result["notification_id"]) + assert json.loads(notif["options"]) == ["yes", "no"] + meta = json.loads(notif["metadata"]) + assert meta["option_labels"] == { + "yes": "Yes please", "no": "No thanks", + } + + +# ---------------------------------------------------------------------- +# handle_answer dispatch path +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestHandleAnswerApproval: + async def test_approve_invokes_dispatcher_and_writes_audit( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-1", + title="run lint", + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "approve", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "approve" + + # Audit log: an ``approval-acted`` event arrived in the + # state-dir-scoped audit log. The minimal helper honors + # NERVE_MECHANICAL_STATE_DIR (set by the fixture) so each test + # writes to its own isolated audit.jsonl. + state_dir = ( + workspace_with_scripts / ".nerve" / "mechanical-actions" + ) + events = read_audit_jsonl(state_dir) + acted = [e for e in events if e.get("event") == "approval-acted"] + assert any( + e.get("notification_id") == nid + and e.get("decision") == "approve" + and e.get("ok") is True + for e in acted + ) + + # Broadcast fired with approval_status="answered". + approval_broadcasts = [ + m for _, m in patch_broadcaster + if m.get("type") == "notification_answered" + and m.get("notification_id") == nid + ] + assert approval_broadcasts + assert approval_broadcasts[0]["approval_status"] == "answered" + assert approval_broadcasts[0]["dispatch_ok"] is True + # Importantly, no ``answer_injected`` should fire. The answer + # routes through the dispatcher, not back into the session. + injected = [ + m for _, m in patch_broadcaster + if m.get("type") == "answer_injected" + ] + assert not injected + + async def test_snooze_keeps_pending_and_advances_expiry( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-2", + title="snooze me", + expiry_hours=2, + ) + nid = result["notification_id"] + + before = await db.get_notification(nid) + prior_expiry = before["expires_at"] + + ok = await svc.handle_answer(nid, "snooze_24h", "web") + assert ok is True + + after = await db.get_notification(nid) + assert after["status"] == "pending" + assert after["expires_at"] is not None + # Expiry advanced forward; sanity check it is not the original. + assert after["expires_at"] != prior_expiry + # And no answer recorded (snooze is not a final answer). + assert after["answer"] is None + + approval_broadcasts = [ + m for _, m in patch_broadcaster + if m.get("type") == "notification_answered" + and m.get("notification_id") == nid + ] + assert approval_broadcasts[0]["approval_status"] == "snoozed" + + async def test_decline_marks_answered_with_decline( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-3", + title="decline me", + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "decline", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "decline" + + async def test_unknown_target_kind_marks_answered_without_dispatch( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + """If a row has a target_kind no dispatcher knows about, we still + flip the status so the row doesn't get re-delivered, and the + audit log records the no-dispatcher state. + """ + await db.create_session("s1") + await db.create_notification( + notification_id="orphan-1", + session_id="s1", + type="approval", + title="orphan", + target_kind="never-registered", + target_id="x", + ) + svc = NotificationService(fake_config, db, fake_engine) + ok = await svc.handle_answer("orphan-1", "approve", "web") + assert ok is True + notif = await db.get_notification("orphan-1") + assert notif["status"] == "answered" + + async def test_legacy_question_path_still_injects_answer( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + """Type=question (no target_kind) must keep flowing through the + session-injection path, untouched by the approval dispatch. + """ + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.ask_question( + session_id="s1", + title="legacy", + body="ask me anything", + options=["yes", "no"], + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "yes", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "yes" + + # Confirm we broadcast the session-scoped answer_injected event, + # AND queued a run on the engine (since the session is not + # currently running per the fake_engine fixture). + injected = [ + m for _, m in patch_broadcaster + if m.get("type") == "answer_injected" + and m.get("notification_id") == nid + ] + assert injected + # Wait for any fire-and-forget answer task to settle. + await asyncio.sleep(0) + fake_engine.run.assert_called() + + +# ---------------------------------------------------------------------- +# Handler registry sanity +# ---------------------------------------------------------------------- + + +class TestHandlerRegistry: + def test_mechanical_action_dispatcher_registered(self): + assert "mechanical-action" in _handlers.known_kinds() + + def test_default_approval_options(self): + opts = _handlers.default_approval_options() + values = {o["value"] for o in opts} + assert values == {"approve", "decline", "snooze_24h"} + + def test_dispatcher_rejects_unsupported_decision(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Even with a valid workspace, an unknown decision should fail + # cleanly with an audit_event marking the rejection. + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + (scripts_dir / "mechanical-action.sh").write_text("#!/usr/bin/env bash\nexit 0\n") + (scripts_dir / "mechanical-action.sh").chmod(0o755) + monkeypatch.setenv("NERVE_WORKSPACE_PATH", str(tmp_path)) + + result = _handlers._dispatch_mechanical_action( + {"id": "n-1"}, "x", "rubberstamp", None, + ) + assert result.ok is False + assert "unsupported decision" in result.audit_event.get("error", "") diff --git a/web/src/api/websocket.ts b/web/src/api/websocket.ts index 62e59cf..fd58fb2 100644 --- a/web/src/api/websocket.ts +++ b/web/src/api/websocket.ts @@ -19,8 +19,8 @@ export type WSMessage = | { type: 'subagent_start'; session_id: string; tool_use_id: string; subagent_type: string; description: string; model?: string } | { type: 'subagent_complete'; session_id: string; tool_use_id: string; duration_ms: number; is_error?: boolean } | { type: 'file_changed'; session_id: string; path: string; operation: string; tool_use_id: string } - | { type: 'notification'; notification_id: string; notification_type: 'notify' | 'question'; session_id: string; title: string; body: string; priority: string; options: string[] | null } - | { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string } + | { type: 'notification'; notification_id: string; notification_type: 'notify' | 'question' | 'approval'; session_id: string; title: string; body: string; priority: string; options: string[] | null; option_labels?: Record; target_kind?: string; target_id?: string } + | { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string; approval_status?: 'answered' | 'snoozed'; dispatch_ok?: boolean } | { type: 'answer_injected'; session_id: string; notification_id: string; title: string; answer: string; answered_by: string; content: string } | { type: 'session_running'; session_id: string; is_running: boolean } | { type: 'background_tasks_update'; session_id: string; tasks: { task_id: string; label: string; tool: string; status: 'running' | 'done' | 'timeout' }[] } diff --git a/web/src/components/Notifications/NotificationToast.tsx b/web/src/components/Notifications/NotificationToast.tsx index 5bb05e6..1f6c0fe 100644 --- a/web/src/components/Notifications/NotificationToast.tsx +++ b/web/src/components/Notifications/NotificationToast.tsx @@ -1,10 +1,28 @@ import { useEffect } from 'react'; import { useNavigate } from 'react-router-dom'; -import { Bell, HelpCircle, X } from 'lucide-react'; -import { useNotificationStore } from '../../stores/notificationStore'; +import { Bell, HelpCircle, ShieldCheck, X } from 'lucide-react'; +import { useNotificationStore, type Notification } from '../../stores/notificationStore'; const TOAST_DURATION = 5000; +const APPROVAL_QUICK_BUTTON_STYLES: Record = { + approve: 'bg-emerald-400/15 text-hue-emerald border-emerald-400/30 hover:bg-emerald-400/25', + decline: 'bg-red-400/15 text-hue-red border-red-400/30 hover:bg-red-400/25', + snooze_24h: 'bg-border-subtle/40 text-text-muted border-border-subtle hover:bg-border-subtle/60', +}; + +const APPROVAL_QUICK_LABELS: Record = { + approve: '✅ Approve', + decline: '❌ Decline', + snooze_24h: '💤 Snooze', +}; + +function quickLabel(value: string, notif: Notification): string { + const labels = notif.option_labels; + if (labels && labels[value]) return labels[value]; + return APPROVAL_QUICK_LABELS[value] || value; +} + export function NotificationToast() { const { toastQueue, dismissToast, answerNotification } = useNotificationStore(); const navigate = useNavigate(); @@ -27,6 +45,7 @@ export function NotificationToast() {
{visible.map((notif) => { const isQuestion = notif.type === 'question'; + const isApproval = notif.type === 'approval'; const options = notif.options ? (typeof notif.options === 'string' ? JSON.parse(notif.options) : notif.options) : null; return ( @@ -35,7 +54,9 @@ export function NotificationToast() { className="bg-surface-raised border border-border-subtle rounded-lg shadow-xl p-3 animate-slide-in" >
- {isQuestion ? ( + {isApproval ? ( + + ) : isQuestion ? ( ) : ( @@ -78,6 +99,26 @@ export function NotificationToast() { ))}
)} + {/* Quick action buttons for approvals */} + {isApproval && options && notif.status === 'pending' && ( +
+ {options.slice(0, 3).map((value: string) => { + const style = APPROVAL_QUICK_BUTTON_STYLES[value] || 'bg-accent/15 text-accent border-accent/30 hover:bg-accent/25'; + return ( + + ); + })} +
+ )}
diff --git a/web/src/pages/NotificationsPage.tsx b/web/src/pages/NotificationsPage.tsx index c4bec7e..0c32e73 100644 --- a/web/src/pages/NotificationsPage.tsx +++ b/web/src/pages/NotificationsPage.tsx @@ -1,6 +1,6 @@ import { useEffect, useState } from 'react'; import { useNavigate } from 'react-router-dom'; -import { Bell, X, CheckCheck, EyeOff } from 'lucide-react'; +import { Bell, X, CheckCheck, EyeOff, Check, XCircle, Moon } from 'lucide-react'; import { useNotificationStore, type Notification } from '../stores/notificationStore'; const STATUS_STYLES: Record = { @@ -17,6 +17,12 @@ const PRIORITY_DOTS: Record = { low: '', }; +const TYPE_BADGE_STYLES: Record = { + question: 'bg-blue-400/10 text-hue-blue border-blue-400/20', + approval: 'bg-violet-400/10 text-hue-violet border-violet-400/20', + notify: 'bg-border-subtle/50 text-text-muted border-border-subtle', +}; + const STATUS_FILTERS = [ { label: 'All', value: '' }, { label: 'Pending', value: 'pending' }, @@ -28,8 +34,63 @@ const TYPE_FILTERS = [ { label: 'All', value: '' }, { label: 'Notifications', value: 'notify' }, { label: 'Questions', value: 'question' }, + { label: 'Approvals', value: 'approval' }, ]; +// Approval-kind button styling. Keyed by the option ``value`` the +// dispatcher receives, not the human label, so the styling stays +// stable even when labels are renamed. +const APPROVAL_BUTTON_STYLES: Record = { + approve: + 'bg-emerald-400/15 text-hue-emerald border-emerald-400/30 hover:bg-emerald-400/25', + decline: + 'bg-red-400/15 text-hue-red border-red-400/30 hover:bg-red-400/25', + snooze_24h: + 'bg-border-subtle/40 text-text-muted border-border-subtle hover:bg-border-subtle/60', +}; + +const APPROVAL_DEFAULT_BUTTON_STYLE = + 'bg-accent/15 text-accent border-accent/30 hover:bg-accent/25'; + +const APPROVAL_BUTTON_ICONS: Record = { + approve: Check, + decline: XCircle, + snooze_24h: Moon, +}; + +const APPROVAL_DEFAULT_LABELS: Record = { + approve: 'Approve', + decline: 'Decline', + snooze_24h: 'Snooze 24h', +}; + +function approvalLabel(value: string, labels: Record | null | undefined): string { + if (labels && labels[value]) return labels[value]; + if (APPROVAL_DEFAULT_LABELS[value]) return APPROVAL_DEFAULT_LABELS[value]; + // Fall back to value with underscores replaced and title cased. + return value + .split('_') + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(' '); +} + +function parseOptionLabels(notif: Notification): Record | null { + if (notif.option_labels) return notif.option_labels; + if (!notif.metadata) return null; + try { + const meta = typeof notif.metadata === 'string' ? JSON.parse(notif.metadata) : notif.metadata; + if (meta && typeof meta === 'object' && 'option_labels' in meta) { + const labels = (meta as Record).option_labels; + if (labels && typeof labels === 'object') { + return labels as Record; + } + } + } catch { + // Malformed JSON: just fall back to defaults. + } + return null; +} + function FreeTextInput({ onSubmit }: { onSubmit: (text: string) => void }) { const [text, setText] = useState(''); const [open, setOpen] = useState(false); @@ -90,6 +151,8 @@ function NotificationCard({ notif }: { notif: Notification }) { const { answerNotification, dismissNotification } = useNotificationStore(); const priorityDot = PRIORITY_DOTS[notif.priority]; const options = notif.options ? (typeof notif.options === 'string' ? JSON.parse(notif.options) : notif.options) : null; + const isApproval = notif.type === 'approval'; + const optionLabels = isApproval ? parseOptionLabels(notif) : null; return (
{notif.body}

)} + {isApproval && notif.target_kind && notif.target_id && ( +

+ {notif.target_kind}: {notif.target_id} +

+ )}
{notif.status} - + {notif.type}
@@ -151,10 +219,32 @@ function NotificationCard({ notif }: { notif: Notification }) { )} + {/* Action UI for pending approvals */} + {isApproval && notif.status === 'pending' && ( +
+ {options?.map((value: string) => { + const Icon = APPROVAL_BUTTON_ICONS[value]; + const buttonStyle = APPROVAL_BUTTON_STYLES[value] || APPROVAL_DEFAULT_BUTTON_STYLE; + const label = approvalLabel(value, optionLabels); + return ( + + ); + })} +
+ )} + {/* Show answer if answered */} {notif.status === 'answered' && (
- Answer: {notif.answer} (via {notif.answered_by}) + Answer: {isApproval ? approvalLabel(notif.answer || '', optionLabels) : notif.answer}{' '} + (via {notif.answered_by})
)} diff --git a/web/src/stores/notificationStore.ts b/web/src/stores/notificationStore.ts index b639a71..9a1a56b 100644 --- a/web/src/stores/notificationStore.ts +++ b/web/src/stores/notificationStore.ts @@ -5,7 +5,7 @@ export interface Notification { id: string; session_id: string; session_title: string | null; - type: 'notify' | 'question'; + type: 'notify' | 'question' | 'approval'; title: string; body: string; priority: string; @@ -16,6 +16,12 @@ export interface Notification { answered_at: string | null; created_at: string; expires_at: string | null; + target_kind?: string | null; + target_id?: string | null; + // Optional label map for approval-kind rows: value -> human label. + // Sent on the WS notification payload and stored on the row metadata. + option_labels?: Record | null; + metadata?: string | Record | null; } interface NotificationState { @@ -113,6 +119,9 @@ export const useNotificationStore = create((set, get) => ({ answered_at: null, created_at: new Date().toISOString(), expires_at: null, + target_kind: data.target_kind ?? null, + target_id: data.target_id ?? null, + option_labels: data.option_labels ?? null, }; set(s => ({