From e0ac659a2d7ac1e4b179634baad3d3d860dd2b32 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Tue, 19 May 2026 15:43:18 +0000 Subject: [PATCH] feat(notifications): approval kind + handler registry Adds an actionable notification kind that routes the user's answer to a server-side dispatcher instead of injecting it back into the originating session. PR 1 of the actionable-inbox series. What it ships: - v028 migration: notifications gain target_kind + target_id columns and an index on (target_kind, target_id). Existing rows are left with NULL target_kind, which the answer routing treats as the legacy session-injection path. type=approval is the third valid type value alongside notify and question. - NotificationStore: create_notification accepts target_kind + target_id. New snooze_notification(id, expires_at) advances the row's expiry while keeping it pending so a later re-delivery tick can surface it again. - nerve/notifications/handlers.py: a dispatcher registry keyed by target_kind. PR 1 registers one entry, mechanical-action, which shells into /scripts/mechanical-action.sh for the approve / decline / snooze_24h decisions. Workspace resolves from \$NERVE_WORKSPACE_PATH (test override) or config.workspace. - NotificationService gains propose_action(target_kind, target_id, title, options, ...). handle_answer detects type=approval and routes through the registry, appending an approval-acted event to ~/.nerve/mechanical-actions/audit.jsonl so the proposal lifecycle (proposed -> approval-acted -> approved/declined/executed) is visible in one place. The legacy question path is unchanged. - propose_action MCP tool (module-level + session-scoped) mirrors ask_user with target_kind + target_id required and an options arg accepting {label, value} pairs. - Telegram delivery reuses the existing notif:{id}:{value} callback format and the existing CallbackQueryHandler. Approval buttons render with emoji prefixes (Approve / Decline / Snooze 24h); the canonical value still flows back on the callback so the dispatcher sees a stable key. - Web NotificationCard renders type=approval with kind-styled buttons (emerald approve, red decline, neutral snooze) and surfaces the target identifier under the body. NotificationToast picks up the same approval styling. The notification store now ships option labels alongside option values so labels can be renamed without changing callback payloads. - 16 backend tests cover schema, store, propose_action shape, the approve / decline / snooze dispatch paths, the no-dispatcher fallback, and confirm the question path still flows through the session-injection branch. Out of scope (PR 2): workspace executor cron rewrite to file propose_action for mechanical-action TAPs, the plan dispatcher, and the snooze re-delivery scheduler tick. PR 1 ships only the Nerve- side surface so the cron rewrite has a stable contract to call. --- nerve/agent/tools.py | 163 ++++- .../v028_notification_approval_kind.py | 44 ++ nerve/db/notifications.py | 46 +- nerve/notifications/handlers.py | 300 +++++++++ nerve/notifications/service.py | 369 ++++++++++- tests/test_notifications_actionable.py | 576 ++++++++++++++++++ web/src/api/websocket.ts | 4 +- .../Notifications/NotificationToast.tsx | 47 +- web/src/pages/NotificationsPage.tsx | 96 ++- web/src/stores/notificationStore.ts | 11 +- 10 files changed, 1621 insertions(+), 35 deletions(-) create mode 100644 nerve/db/migrations/v028_notification_approval_kind.py create mode 100644 nerve/notifications/handlers.py create mode 100644 tests/test_notifications_actionable.py diff --git a/nerve/agent/tools.py b/nerve/agent/tools.py index bca2dd0..9ffcd5e 100644 --- a/nerve/agent/tools.py +++ b/nerve/agent/tools.py @@ -1799,6 +1799,80 @@ async def _ask_user_impl(args: dict, session_id: str) -> dict: return {"content": [{"type": "text", "text": f"Failed to ask question: {e}"}]} +def _parse_action_options(raw) -> list[dict[str, str]] | None: + """Parse the ``options`` arg for propose_action. + + Accepts: + - a list of ``{"label": ..., "value": ...}`` dicts (passed through) + - a list of strings (interpreted as ``label == value``) + - a JSON-encoded string of either of the above + - falsy / empty -> None (caller falls back to dispatcher defaults) + """ + if raw is None or raw == "": + return None + if isinstance(raw, str): + try: + raw = json.loads(raw) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(raw, list) or not raw: + return None + out: list[dict[str, str]] = [] + for item in raw: + if isinstance(item, dict): + value = str(item.get("value", "")).strip() + label = str(item.get("label", value)).strip() + if not value: + continue + out.append({"label": label or value, "value": value}) + elif isinstance(item, str): + v = item.strip() + if v: + out.append({"label": v, "value": v}) + return out or None + + +async def _propose_action_impl(args: dict, session_id: str) -> dict: + """Core implementation for the propose_action tool.""" + if not _notification_service: + return {"content": [{"type": "text", "text": "Notification service not available."}]} + + target_kind = str(args.get("target_kind", "")).strip() + target_id = str(args.get("target_id", "")).strip() + title = str(args.get("title", "")).strip() + if not target_kind or not target_id or not title: + return {"content": [{"type": "text", "text": ( + "propose_action: target_kind, target_id, and title are required." + )}]} + + body = args.get("body", "") + options = _parse_action_options(args.get("options")) + priority = args.get("priority", "high") + expires_at = args.get("expires_at") or None + + try: + result = await _notification_service.propose_action( + session_id=session_id, + target_kind=target_kind, + target_id=target_id, + title=title, + body=body, + options=options, + priority=priority, + expires_at=expires_at, + ) + + nid = result["notification_id"] + return {"content": [{"type": "text", "text": ( + f"Approval requested ({nid}). When the user picks a button, " + f"the {target_kind} dispatcher acts on {target_id}; the answer " + f"is NOT injected back into this session." + )}]} + except Exception as e: + logger.error("propose_action tool failed: %s", e) + return {"content": [{"type": "text", "text": f"Failed to propose action: {e}"}]} + + async def _react_impl(args: dict, session_id: str) -> dict: """Core implementation for the react tool.""" if not _engine: @@ -2008,6 +2082,62 @@ async def mcp_reload(args: dict) -> dict: "required": ["title"], } +_PROPOSE_ACTION_SCHEMA = { + "type": "object", + "properties": { + "target_kind": { + "type": "string", + "description": ( + "Dispatcher key the user's answer routes through. " + "Currently supported: 'mechanical-action'." + ), + }, + "target_id": { + "type": "string", + "description": ( + "Dispatcher-specific identifier the chosen decision acts " + "on (e.g. a queued mechanical-action proposal id like " + "'20260519T143906Z-d2e62e')." + ), + }, + "title": { + "type": "string", + "description": "Short headline shown on the notification card.", + }, + "body": { + "type": "string", + "description": ( + "Markdown body with the justification and any details " + "the user needs to decide." + ), + "default": "", + }, + "options": { + "type": "string", + "description": ( + "JSON array of {label, value} dicts overriding the " + "dispatcher's default options. Leave empty for the " + "canonical Approve / Decline / Snooze 24h triplet." + ), + "default": "", + }, + "priority": { + "type": "string", + "description": "'low', 'normal', 'high', 'urgent'. Default: 'high'.", + "default": "high", + }, + "expires_at": { + "type": "string", + "description": ( + "ISO-8601 UTC timestamp the row expires at. Omit to use " + "the configured default expiry window." + ), + "default": "", + }, + }, + "required": ["target_kind", "target_id", "title"], +} + _REACT_SCHEMA = { "type": "object", "properties": { @@ -2077,6 +2207,21 @@ async def ask_user_tool(args: dict) -> dict: return await _ask_user_impl(args, _current_session_id) +@tool( + "propose_action", + "Ask the user to approve, decline, or snooze a queued action. " + "Unlike ask_user, the answer routes through a server-side dispatcher " + "keyed by target_kind (e.g. 'mechanical-action') and acts on target_id " + "directly. The answer is NOT injected back into this session. " + "Use for queued mechanical actions, pending plans, or any binary " + "decision the user owns and the agent has already prepared.", + _PROPOSE_ACTION_SCHEMA, +) +async def propose_action_tool(args: dict) -> dict: + """Propose an action (fallback path; uses deprecated global).""" + return await _propose_action_impl(args, _current_session_id) + + # --------------------------------------------------------------------------- # houseofagents tools (module-level — don't need session_id) # --------------------------------------------------------------------------- @@ -2227,6 +2372,20 @@ async def session_ask_user(args: dict) -> dict: # session_id captured from enclosing scope — race-free return await _ask_user_impl(args, session_id) + @tool( + "propose_action", + "Ask the user to approve, decline, or snooze a queued action. " + "Unlike ask_user, the answer routes through a server-side dispatcher " + "keyed by target_kind (e.g. 'mechanical-action') and acts on target_id " + "directly. The answer is NOT injected back into this session. " + "Use for queued mechanical actions, pending plans, or any binary " + "decision the user owns and the agent has already prepared.", + _PROPOSE_ACTION_SCHEMA, + ) + async def session_propose_action(args: dict) -> dict: + # session_id captured from enclosing scope; race-free. + return await _propose_action_impl(args, session_id) + @tool( "react", "Set an emoji reaction on the user's last message. " @@ -2357,8 +2516,8 @@ async def session_send_file(args: dict) -> dict: return await _send_file_impl(args, session_id) # Shared tools (don't need session context) + session-scoped tools - shared_tools = [t for t in ALL_TOOLS if t.name not in ("notify", "ask_user", "react", "send_sticker", "plan_propose", "plan_update")] - session_tools: list[SdkMcpTool] = [session_notify, session_ask_user, session_react, session_send_sticker, session_plan_propose, session_plan_update, session_send_file] + shared_tools = [t for t in ALL_TOOLS if t.name not in ("notify", "ask_user", "propose_action", "react", "send_sticker", "plan_propose", "plan_update")] + session_tools: list[SdkMcpTool] = [session_notify, session_ask_user, session_propose_action, session_react, session_send_sticker, session_plan_propose, session_plan_update, session_send_file] # Only include houseofagents tools when enabled — saves context tokens otherwise hoa_enabled = _config and _config.houseofagents.enabled diff --git a/nerve/db/migrations/v028_notification_approval_kind.py b/nerve/db/migrations/v028_notification_approval_kind.py new file mode 100644 index 0000000..e4e8692 --- /dev/null +++ b/nerve/db/migrations/v028_notification_approval_kind.py @@ -0,0 +1,44 @@ +"""V28: Add target_kind / target_id columns to notifications. + +Extends the notification table to support the ``approval`` notification +kind: notifications that route to a server-side dispatcher when the user +answers them (e.g. approve / decline / snooze a queued mechanical +action). The existing ``type`` column gains a third valid value +(``approval``); the column itself stays TEXT so no schema change is +needed there. + +The two new columns: + +- ``target_kind`` TEXT NULL: dispatcher key (e.g. ``mechanical-action``, + ``plan``). NULL for legacy ``notify`` / ``question`` rows, which means + "no dispatch; fall through to the existing answer-injection path." +- ``target_id`` TEXT NULL: dispatcher-specific identifier (e.g. the + mechanical-action proposal id). Read by the handler registry. + +Existing rows are left untouched (target_kind = NULL), so the answer +path stays identical for every notification created before v28. +""" + +from __future__ import annotations + +import logging + +import aiosqlite + +logger = logging.getLogger(__name__) + + +async def up(db: aiosqlite.Connection) -> None: + await db.execute( + "ALTER TABLE notifications ADD COLUMN target_kind TEXT" + ) + await db.execute( + "ALTER TABLE notifications ADD COLUMN target_id TEXT" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_notifications_target " + "ON notifications(target_kind, target_id)" + ) + logger.info( + "v028: added target_kind/target_id to notifications + index" + ) diff --git a/nerve/db/notifications.py b/nerve/db/notifications.py index 6bf8e35..507be6c 100644 --- a/nerve/db/notifications.py +++ b/nerve/db/notifications.py @@ -17,18 +17,30 @@ async def create_notification( title: str, body: str = "", priority: str = "normal", - options: list[str] | None = None, + options: list | None = None, expires_at: str | None = None, metadata: dict | None = None, + target_kind: str | None = None, + target_id: str | None = None, ) -> dict: + """Insert a notification row. + + ``type`` is one of ``notify`` (fire-and-forget), ``question`` + (ask_user / answer-injection), or ``approval`` (action-dispatch + via the handler registry). ``target_kind`` and ``target_id`` are + only populated for ``approval`` rows; left NULL otherwise so the + legacy answer path stays untouched. + """ now = datetime.now(timezone.utc).isoformat() await self.db.execute( """INSERT INTO notifications - (id, session_id, type, title, body, priority, options, expires_at, metadata, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (id, session_id, type, title, body, priority, options, + expires_at, metadata, created_at, target_kind, target_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (notification_id, session_id, type, title, body, priority, json.dumps(options) if options else None, - expires_at, json.dumps(metadata or {}), now), + expires_at, json.dumps(metadata or {}), now, + target_kind, target_id), ) await self.db.commit() return {"id": notification_id, "session_id": session_id, "type": type} @@ -128,6 +140,32 @@ async def expire_notifications(self) -> int: await self.db.commit() return cursor.rowcount + async def snooze_notification( + self, notification_id: str, new_expires_at: str, + ) -> bool: + """Push a pending notification's expiry forward. + + Used by the ``approval`` dispatcher when the user picks + ``snooze_24h``: the row stays at status=pending so a later + re-delivery tick (wired in PR 2) can surface it again, but the + expiry advances so it does not get caught by ``expire_stale`` + in the meantime. + + Returns True on success, False if the row is not pending. + """ + async with self._atomic(): + async with self.db.execute( + "SELECT id FROM notifications WHERE id = ? AND status = 'pending'", + (notification_id,), + ) as cursor: + if not await cursor.fetchone(): + return False + await self.db.execute( + "UPDATE notifications SET expires_at = ? WHERE id = ?", + (new_expires_at, notification_id), + ) + return True + async def count_pending_notifications(self, channel: str | None = None) -> int: sql = "SELECT COUNT(*) FROM notifications WHERE status = 'pending'" params: tuple = () diff --git a/nerve/notifications/handlers.py b/nerve/notifications/handlers.py new file mode 100644 index 0000000..485ebd8 --- /dev/null +++ b/nerve/notifications/handlers.py @@ -0,0 +1,300 @@ +"""Dispatch registry for ``approval``-kind notifications. + +When a user answers an ``approval`` notification, the notification +service looks up the row's ``target_kind`` and routes the decision to +the matching dispatcher in this module. Dispatchers know how to act on +their target type (a mechanical-action proposal, a plan, etc.) and +return a structured ``DispatchResult`` so the service can audit-log the +outcome uniformly. + +PR 1 ships one dispatcher: ``mechanical-action``. PR 2 will add the +``plan`` dispatcher and the per-target re-delivery wiring. + +Design notes: + +- The registry is keyed by ``target_kind`` (a string). Decisions + (``approve`` / ``decline`` / ``snooze_24h`` / future values) are + passed as a function arg, not part of the key, so adding a new + decision doesn't need a new registry entry. +- Dispatchers receive the raw notification dict, the target id, the + decision string, and the live ``NerveConfig`` (for the workspace + path; see ``mechanical-action`` below). The service-side caller is + responsible for committing the DB-level status flip, so dispatchers + can stay pure-side-effect-only against the target system. +- Dispatchers should be deterministic in their audit_event keys so the + audit log stays grep-able. +""" + +from __future__ import annotations + +import logging +import os +import shutil +import subprocess +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from nerve.config import NerveConfig + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------- +# Result type +# ---------------------------------------------------------------------- + + +@dataclass +class DispatchResult: + """Outcome of a single dispatch call. + + Fields: + - ``ok``: True when the dispatcher's downstream action completed + successfully. False on a non-zero exit or an internal error. The + service still flips the notification's status to ``answered`` on + either outcome, so the user does not see a re-delivered approval + that the system already acted on; the audit_event records the + failure for replay. + - ``audit_event``: a structured dict the caller appends to the + mechanical-actions audit log. Already includes the dispatcher's + own event name (``approval-acted``); the service adds the + notification id and timestamp. + - ``snooze_until``: ISO-8601 UTC timestamp for the new expiry when + the decision is a snooze. None for approve / decline. + """ + + ok: bool + audit_event: dict[str, Any] = field(default_factory=dict) + snooze_until: str | None = None + + +# ---------------------------------------------------------------------- +# Registry primitives +# ---------------------------------------------------------------------- + + +Dispatcher = Callable[ + [dict[str, Any], str, str, "NerveConfig | None"], DispatchResult +] + + +_DISPATCHERS: dict[str, Dispatcher] = {} + + +def register(target_kind: str, fn: Dispatcher) -> None: + """Register a dispatcher for a ``target_kind``. + + Idempotent: re-registering overwrites the previous entry so test + code can swap in fakes per-test without leaking state across the + module-level dict. + """ + _DISPATCHERS[target_kind] = fn + + +def get(target_kind: str) -> Dispatcher | None: + """Look up a dispatcher by ``target_kind``. None if unregistered.""" + return _DISPATCHERS.get(target_kind) + + +def known_kinds() -> list[str]: + """Return the currently registered ``target_kind`` values.""" + return sorted(_DISPATCHERS.keys()) + + +# ---------------------------------------------------------------------- +# mechanical-action dispatcher +# ---------------------------------------------------------------------- + + +# Valid decisions for the mechanical-action dispatcher. Kept here so a +# typo in the caller surfaces as an explicit failure rather than a +# silent no-op shell call. +_MECHANICAL_DECISIONS = frozenset({"approve", "decline", "snooze_24h"}) + + +def _resolve_workspace( + config: "NerveConfig | None", +) -> Path | None: + """Pick the workspace directory. + + Priority: + 1. ``$NERVE_WORKSPACE_PATH`` env var (test / override hook). + 2. ``config.workspace`` from the live NerveConfig. + 3. None if neither is available. + """ + override = os.environ.get("NERVE_WORKSPACE_PATH") + if override: + return Path(override).expanduser() + if config is not None and config.workspace: + return Path(config.workspace).expanduser() + return None + + +def _dispatch_mechanical_action( + notification: dict[str, Any], + target_id: str, + decision: str, + config: "NerveConfig | None", +) -> DispatchResult: + """Approve / decline / snooze a queued mechanical-action proposal. + + Shells out to ``/scripts/mechanical-action.sh`` which is + the decide-side wrapper around the propose-mechanical-action + primitive. The wrapper handles audit-log writes for the + approve / decline paths; this dispatcher writes its own + ``approval-acted`` audit event so the chain is observable from the + notification side as well. + + Snooze: rather than touch the queue file directly, this dispatcher + calls ``mechanical-action.sh snooze --hours 24`` and lets the + decide-side script record the audit event and update the queue + entry's ``not_before`` field. PR 2 wires the re-delivery scheduler; + until then, the snooze just keeps the proposal in queue/ with a + future ``not_before`` timestamp. + """ + notif_id = notification.get("id", "") + base_event: dict[str, Any] = { + "event": "approval-acted", + "notification_id": notif_id, + "target_kind": "mechanical-action", + "target_id": target_id, + "decision": decision, + } + + if decision not in _MECHANICAL_DECISIONS: + logger.warning( + "mechanical-action dispatch: unsupported decision %r on %s", + decision, target_id, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"unsupported decision: {decision}", + }, + ) + + workspace = _resolve_workspace(config) + if workspace is None: + logger.error( + "mechanical-action dispatch: no workspace configured; " + "set NERVE_WORKSPACE_PATH or config.workspace", + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": "workspace path unresolved", + }, + ) + + script_path = workspace / "scripts" / "mechanical-action.sh" + if not script_path.is_file(): + logger.error( + "mechanical-action dispatch: script missing at %s", + script_path, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"script missing: {script_path}", + }, + ) + + if shutil.which("bash") is None: + logger.error("mechanical-action dispatch: bash not on PATH") + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": "bash not on PATH", + }, + ) + + if decision == "approve": + cmd = ["bash", str(script_path), "approve", target_id] + snooze_until = None + elif decision == "decline": + reason = ( + f"user declined via notification {notif_id}" + if notif_id else "user declined via notification" + ) + cmd = [ + "bash", str(script_path), "decline", target_id, + "--reason", reason, + ] + snooze_until = None + else: # snooze_24h + cmd = [ + "bash", str(script_path), "snooze", target_id, + "--hours", "24", + ] + snooze_until = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + + try: + completed = subprocess.run( + cmd, capture_output=True, text=True, check=False, timeout=30, + ) + except (OSError, subprocess.SubprocessError) as exc: + logger.error( + "mechanical-action dispatch: subprocess failed: %s", exc, + ) + return DispatchResult( + ok=False, + audit_event={ + **base_event, + "ok": False, + "error": f"subprocess error: {exc}", + }, + snooze_until=snooze_until, + ) + + ok = completed.returncode == 0 + audit_event = { + **base_event, + "ok": ok, + "exit_code": completed.returncode, + } + # Capture a short tail of stderr on failure so the audit log shows + # what went wrong without becoming a wall of text. + if not ok and completed.stderr: + audit_event["stderr_tail"] = completed.stderr[-512:] + + return DispatchResult( + ok=ok, + audit_event=audit_event, + snooze_until=snooze_until, + ) + + +register("mechanical-action", _dispatch_mechanical_action) + + +# ---------------------------------------------------------------------- +# Convenience: default approval options +# ---------------------------------------------------------------------- + + +def default_approval_options() -> list[dict[str, str]]: + """Return the canonical Approve / Decline / Snooze 24h triplet. + + Used by the ``propose_action`` MCP tool when the caller does not + supply its own options list. Keeping this here (next to the + dispatcher) keeps the option set co-located with the decisions the + dispatcher actually understands. + """ + return [ + {"label": "Approve", "value": "approve"}, + {"label": "Decline", "value": "decline"}, + {"label": "Snooze 24h", "value": "snooze_24h"}, + ] diff --git a/nerve/notifications/service.py b/nerve/notifications/service.py index ec89b3c..9f22ce5 100644 --- a/nerve/notifications/service.py +++ b/nerve/notifications/service.py @@ -1,8 +1,10 @@ """Notification service — centralized fanout, answer routing, and persistence. Coordinates between MCP tools (agent-side), channels (delivery), and the -answer routing mechanism (user-side). Supports fire-and-forget notifications -and async questions with multi-channel delivery (web UI + Telegram). +answer routing mechanism (user-side). Supports fire-and-forget notifications, +async questions with multi-channel delivery (web UI + Telegram), and +``approval``-kind notifications that route to a server-side dispatcher when +the user picks an inline option (see ``nerve.notifications.handlers``). """ from __future__ import annotations @@ -10,9 +12,13 @@ import asyncio import json import logging +import os import uuid from datetime import datetime, timezone, timedelta -from typing import TYPE_CHECKING +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from nerve.notifications import handlers as _handlers if TYPE_CHECKING: from nerve.agent.engine import AgentEngine @@ -22,6 +28,68 @@ logger = logging.getLogger(__name__) +# Emoji decoration applied to the canonical approval decisions when we +# render their buttons. Keys are the option ``value`` strings; missing +# values fall back to the raw label. +_APPROVAL_EMOJIS: dict[str, str] = { + "approve": "✅", # white heavy check mark + "decline": "❌", # cross mark + "snooze_24h": "\U0001F4A4", # zzz +} + + +def _resolve_workspace(config: NerveConfig | None) -> Path | None: + """Resolve the workspace directory. + + Mirrors ``handlers._resolve_workspace`` so the service can locate + ``scripts/_mechanical_action.py`` for audit-log writes. Priority: + ``$NERVE_WORKSPACE_PATH`` first (test override), then + ``config.workspace``. + """ + override = os.environ.get("NERVE_WORKSPACE_PATH") + if override: + return Path(override).expanduser() + if config is not None and getattr(config, "workspace", None): + return Path(config.workspace).expanduser() + return None + + +def _load_mechanical_action_helper(workspace: Path): + """Import the workspace's ``_mechanical_action`` helper by path. + + The helper is a workspace-side stdlib module, not part of the Nerve + package, so we load it via ``importlib.util`` the same way the + workspace scripts do. Cached on first load via a module-level dict + so repeated approval answers do not re-spec the file each time. + """ + cached = _HELPER_CACHE.get(str(workspace)) + if cached is not None: + return cached + + import importlib.util + + helper_path = workspace / "scripts" / "_mechanical_action.py" + if not helper_path.is_file(): + raise FileNotFoundError( + f"mechanical-action helper not found at {helper_path}" + ) + spec = importlib.util.spec_from_file_location( + f"_mechanical_action__{abs(hash(str(workspace))) & 0xFFFFFF:06x}", + helper_path, + ) + if spec is None or spec.loader is None: + raise ImportError( + f"cannot build module spec for {helper_path}" + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + _HELPER_CACHE[str(workspace)] = module + return module + + +_HELPER_CACHE: dict[str, Any] = {} + + class NotificationService: """Manages notification lifecycle: create, deliver, answer, route.""" @@ -117,6 +185,84 @@ async def ask_question( return {"notification_id": notification_id, "status": "sent"} + async def propose_action( + self, + session_id: str, + target_kind: str, + target_id: str, + title: str, + body: str = "", + options: list[dict[str, str]] | None = None, + priority: str = "high", + expires_at: str | None = None, + expiry_hours: int | None = None, + ) -> dict: + """File an actionable ``approval``-kind notification. + + ``target_kind`` + ``target_id`` route the user's answer through + ``nerve.notifications.handlers`` instead of the question + answer-injection path. ``options`` accepts a list of + ``{"label": ..., "value": ...}`` dicts; when omitted, the + dispatcher's canonical option set is used (Approve / Decline / + Snooze 24h for the mechanical-action dispatcher). + + Returns ``{"notification_id": , "status": "sent"}``. + """ + notification_id = f"approval-{uuid.uuid4().hex[:8]}" + + # Resolve options. Default to the registered dispatcher's + # canonical set when none was passed. Falling back to the + # mechanical-action default keeps PR 1's only wired path + # working without forcing the caller to recite the same triplet. + if options is None: + options = _handlers.default_approval_options() + elif not options: + raise ValueError("propose_action: options must not be empty") + + # Normalize: store as a list of value strings (matching the + # existing question-kind contract) plus a parallel label map in + # metadata so the Telegram + web layers can render the labels + # without re-parsing options on every send. + option_values = [str(opt["value"]) for opt in options] + option_labels = { + str(opt["value"]): str(opt.get("label", opt["value"])) + for opt in options + } + + if expires_at is None: + hours = ( + expiry_hours + or self.config.notifications.default_expiry_hours + ) + expires_at = ( + datetime.now(timezone.utc) + timedelta(hours=hours) + ).isoformat() + + await self.db.create_notification( + notification_id=notification_id, + session_id=session_id, + type="approval", + title=title, + body=body, + priority=priority, + options=option_values, + expires_at=expires_at, + metadata={ + "target_kind": target_kind, + "target_id": target_id, + "option_labels": option_labels, + }, + target_kind=target_kind, + target_id=target_id, + ) + + await self._fanout( + notification_id, session_id, "approval", title, body, + priority, options=option_values, option_labels=option_labels, + ) + + return {"notification_id": notification_id, "status": "sent"} + # ------------------------------------------------------------------ # # Answer routing (called by REST API / Telegram callback) # # ------------------------------------------------------------------ # @@ -127,16 +273,27 @@ async def handle_answer( answer: str, answered_by: str, ) -> bool: - """Process a user's answer to a question. - - 1. Persist the answer in DB. - 2. Inject answer as user message in the session. - 3. Broadcast answer event to web UI. + """Process a user's answer to a question or approval. + + - For ``type=approval`` rows: look up the dispatcher in the + handler registry, run it, audit-log the outcome, then flip + the row's status. Snooze answers advance ``expires_at`` and + keep the row pending so a later re-delivery tick can surface + it again. + - For ``type=question`` rows (legacy): persist the answer, + inject it back into the originating session, broadcast. + - Fire-and-forget ``type=notify`` rows do not flow through this + method; the dismiss endpoint handles those. """ notif = await self.db.get_notification(notification_id) if not notif or notif["status"] != "pending": return False + if notif.get("type") == "approval": + return await self._handle_approval_answer( + notif, answer, answered_by, + ) + success = await self.db.answer_notification( notification_id, answer, answered_by, ) @@ -194,6 +351,137 @@ async def handle_answer( return True + async def _handle_approval_answer( + self, + notif: dict[str, Any], + answer: str, + answered_by: str, + ) -> bool: + """Route an approval answer through the dispatcher registry.""" + notification_id = notif["id"] + session_id = notif["session_id"] + target_kind = notif.get("target_kind") or "" + target_id = notif.get("target_id") or "" + + dispatcher = _handlers.get(target_kind) if target_kind else None + if dispatcher is None: + logger.warning( + "approval %s has no dispatcher for target_kind=%r; " + "marking answered without action", + notification_id, target_kind, + ) + result = _handlers.DispatchResult( + ok=False, + audit_event={ + "event": "approval-acted", + "notification_id": notification_id, + "target_kind": target_kind, + "target_id": target_id, + "decision": answer, + "ok": False, + "error": ( + f"no dispatcher registered for {target_kind!r}" + ), + }, + ) + else: + try: + result = await asyncio.to_thread( + dispatcher, notif, target_id, answer, self.config, + ) + except Exception as exc: # defensive: never crash the route + logger.exception( + "approval dispatch raised for %s (target=%s:%s, " + "decision=%s): %s", + notification_id, target_kind, target_id, answer, exc, + ) + result = _handlers.DispatchResult( + ok=False, + audit_event={ + "event": "approval-acted", + "notification_id": notification_id, + "target_kind": target_kind, + "target_id": target_id, + "decision": answer, + "ok": False, + "error": f"dispatcher raised: {exc}", + }, + ) + + await self._append_approval_audit(result.audit_event) + + # Snooze keeps the row pending with a future expiry so a later + # re-delivery tick (wired in PR 2) can surface it again. + if result.snooze_until is not None and result.ok: + await self.db.snooze_notification( + notification_id, result.snooze_until, + ) + else: + await self.db.answer_notification( + notification_id, answer, answered_by, + ) + + from nerve.agent.streaming import broadcaster + broadcast_status = ( + "snoozed" if (result.snooze_until and result.ok) else "answered" + ) + await broadcaster.broadcast("__global__", { + "type": "notification_answered", + "notification_id": notification_id, + "session_id": session_id, + "answer": answer, + "answered_by": answered_by, + "approval_status": broadcast_status, + "dispatch_ok": result.ok, + }) + + return True + + async def _append_approval_audit(self, event: dict[str, Any]) -> None: + """Append an ``approval-acted`` record to the mechanical-actions log. + + Uses the same audit log that the propose-mechanical-action + primitive writes to (``~/.nerve/mechanical-actions/audit.jsonl``) + so the proposal lifecycle (``proposed`` -> ``approval-acted`` + -> ``approved``/``declined``/``executed``) is visible in one + place. The shared helper module + ``scripts/_mechanical_action.py`` lives under the workspace, so + we import it dynamically by path rather than as a real Python + package. + """ + workspace = _resolve_workspace(self.config) + if workspace is None: + logger.debug( + "approval audit: no workspace configured; event=%s", + event.get("event"), + ) + return + try: + helper = _load_mechanical_action_helper(workspace) + except Exception as exc: # defensive: never crash the route + logger.warning( + "approval audit: cannot load helper at %s: %s", + workspace, exc, + ) + return + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + # Re-use the helper's append path. Use a tag the helper hasn't + # whitelisted yet by patching VALID_EVENTS just for our event, + # so the helper validation stays strict for everything else. + record = {"ts": ts, **event} + valid = getattr(helper, "VALID_EVENTS", None) + if isinstance(valid, set) and "approval-acted" not in valid: + valid.add("approval-acted") + + try: + await asyncio.to_thread(helper.append_audit, record, None) + except Exception as exc: + logger.warning( + "approval audit append failed: %s (event=%s)", + exc, event.get("event"), + ) + def _on_answer_task_done(self, task: asyncio.Task) -> None: """Log errors from answer injection tasks. @@ -230,8 +518,16 @@ async def _fanout( options: list[str] | None = None, channels: list[str] | None = None, silent: bool = False, + option_labels: dict[str, str] | None = None, ) -> None: - """Deliver notification to all configured channels in parallel.""" + """Deliver notification to all configured channels in parallel. + + ``option_labels`` is used by ``approval``-kind notifications: it + maps the canonical option ``value`` (sent back on the callback + as the answer string) to the human-facing label rendered on the + button. ``None`` for the legacy ``question`` path, where the + label and the value are identical. + """ target_channels = channels or self.config.notifications.channels async def _deliver(channel_name: str) -> str | None: @@ -241,13 +537,14 @@ async def _deliver(channel_name: str) -> str | None: await self._deliver_web( notification_id, session_id, notif_type, title, body, priority, options, + option_labels=option_labels, ) return "web" elif channel_name == "telegram": msg_id = await self._deliver_telegram( notification_id, session_id, notif_type, title, body, priority, options, - silent=silent, + silent=silent, option_labels=option_labels, ) if msg_id: await self.db.update_notification( @@ -282,8 +579,15 @@ async def _deliver_web( body: str, priority: str, options: list[str] | None, + option_labels: dict[str, str] | None = None, ) -> None: - """Broadcast notification to web UI via the global broadcaster.""" + """Broadcast notification to web UI via the global broadcaster. + + For approval-kind rows we also include ``option_labels`` so the + web NotificationCard can render readable button text while the + button click still sends the canonical ``value`` back through + the answer endpoint. + """ from nerve.agent.streaming import broadcaster message = { "type": "notification", @@ -295,6 +599,8 @@ async def _deliver_web( "priority": priority, "options": options, } + if option_labels: + message["option_labels"] = option_labels await broadcaster.broadcast("__global__", message) def _resolve_telegram_chat_id(self) -> int | None: @@ -325,8 +631,9 @@ async def _deliver_telegram( priority: str, options: list[str] | None, silent: bool = False, + option_labels: dict[str, str] | None = None, ) -> str | None: - """Send notification to Telegram, with inline keyboard for questions.""" + """Send notification to Telegram, with inline keyboard for questions/approvals.""" bot = self._get_telegram_bot() if not bot: logger.warning("Telegram bot not available for notification %s", notification_id) @@ -347,9 +654,21 @@ async def _deliver_telegram( if self._should_show_session_label(session_id): text += f"\n\nSession: {session_id}" - if notif_type == "question" and options: + if notif_type in ("question", "approval") and options: + button_labels: list[tuple[str, str]] = [] + for value in options: + if notif_type == "approval": + label = ( + (option_labels or {}).get(value) + or value.replace("_", " ").title() + ) + emoji = _APPROVAL_EMOJIS.get(value, "") + rendered = f"{emoji} {label}".strip() if emoji else label + else: + rendered = value + button_labels.append((rendered, value)) return await self._send_telegram_inline( - chat_id, notification_id, text, options, silent=silent, + chat_id, notification_id, text, button_labels, silent=silent, ) else: msg = await self._send_telegram_html(bot, chat_id, text, silent=silent) @@ -388,10 +707,16 @@ async def _send_telegram_inline( chat_id: int, notification_id: str, text: str, - options: list[str], + options: list[str] | list[tuple[str, str]], silent: bool = False, ) -> str | None: - """Send Telegram message with inline keyboard buttons.""" + """Send Telegram message with inline keyboard buttons. + + ``options`` accepts either a flat list of strings (legacy + question kind: label == callback value) or a list of + ``(label, value)`` tuples (approval kind: emoji-prefixed label, + canonical value sent back on the callback). + """ bot = self._get_telegram_bot() if not bot: return None @@ -399,14 +724,18 @@ async def _send_telegram_inline( from telegram import InlineKeyboardButton, InlineKeyboardMarkup buttons = [] - for option in options: - callback_data = f"notif:{notification_id}:{option}" + for entry in options: + if isinstance(entry, tuple): + label, value = entry + else: + label = value = entry + callback_data = f"notif:{notification_id}:{value}" # Telegram callback_data max 64 bytes — truncate option if needed if len(callback_data.encode("utf-8")) > 64: max_opt_len = 64 - len(f"notif:{notification_id}:".encode("utf-8")) - truncated = option.encode("utf-8")[:max_opt_len].decode("utf-8", errors="ignore") + truncated = value.encode("utf-8")[:max_opt_len].decode("utf-8", errors="ignore") callback_data = f"notif:{notification_id}:{truncated}" - buttons.append([InlineKeyboardButton(option, callback_data=callback_data)]) + buttons.append([InlineKeyboardButton(label, callback_data=callback_data)]) keyboard = InlineKeyboardMarkup(buttons) diff --git a/tests/test_notifications_actionable.py b/tests/test_notifications_actionable.py new file mode 100644 index 0000000..3937db1 --- /dev/null +++ b/tests/test_notifications_actionable.py @@ -0,0 +1,576 @@ +"""Tests for the ``approval`` notification kind. + +PR 1 of the actionable-inbox series: +- v028 migration adds ``target_kind`` and ``target_id`` columns. +- ``NotificationService.propose_action`` files a row of type=approval. +- ``handle_answer`` dispatches the user's decision through the + ``nerve.notifications.handlers`` registry. +- The legacy ``type=question`` answer-injection path stays untouched. + +These tests run against a fresh in-memory SQLite per test and stub +out the streaming broadcaster + agent engine so we can assert +behavior in isolation. +""" + +from __future__ import annotations + +import asyncio +import json +import shutil +import sys +import textwrap +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nerve.config import NerveConfig, NotificationsConfig +from nerve.db import Database +from nerve.notifications import handlers as _handlers +from nerve.notifications.service import NotificationService + + +# ---------------------------------------------------------------------- +# Fixtures +# ---------------------------------------------------------------------- + + +@pytest.fixture +def fake_config(tmp_path: Path) -> NerveConfig: + """Minimal NerveConfig with workspace + notifications config wired.""" + cfg = NerveConfig() + cfg.workspace = tmp_path + cfg.notifications = NotificationsConfig( + channels=["web"], # skip telegram in unit tests + telegram_chat_id=None, + default_expiry_hours=48, + priority_prefixes={"high": "", "urgent": ""}, + ) + return cfg + + +@pytest.fixture +def fake_engine() -> MagicMock: + """An engine stub with the minimum surface the service touches.""" + engine = MagicMock() + engine.sessions = MagicMock() + engine.sessions.is_running.return_value = False + engine.router = MagicMock() + engine.router.get_channel.return_value = None + engine.run = AsyncMock() + return engine + + +@pytest.fixture +def patch_broadcaster(monkeypatch: pytest.MonkeyPatch) -> list[tuple[str, dict]]: + """Capture broadcaster.broadcast() calls instead of hitting any WS.""" + captured: list[tuple[str, dict]] = [] + + class _FakeBroadcaster: + async def broadcast(self, channel: str, message: dict) -> None: + captured.append((channel, message)) + + from nerve.agent import streaming + monkeypatch.setattr(streaming, "broadcaster", _FakeBroadcaster()) + return captured + + +_MINIMAL_HELPER_SRC = textwrap.dedent( + '''\ + """Minimal mechanical-action helper used only by the test fixture. + + Mirrors the audit + queue surface the notification service touches + so the dispatcher can shell into a stub script and append an audit + record without dragging in any out-of-tree files. + + Honors ``$NERVE_MECHANICAL_STATE_DIR`` so each test can point the + helper at its own temp directory. + """ + + from __future__ import annotations + + import json + import os + from datetime import datetime, timezone + from pathlib import Path + + _OVERRIDE = os.environ.get("NERVE_MECHANICAL_STATE_DIR") + STATE_DIR = ( + Path(_OVERRIDE).expanduser() if _OVERRIDE + else Path("~/.nerve/mechanical-actions").expanduser() + ) + QUEUE_DIR = STATE_DIR / "queue" + DECISIONS_DIR = STATE_DIR / "decisions" + AUDIT_LOG = STATE_DIR / "audit.jsonl" + + VALID_EVENTS = { + "proposed", "approved", "declined", + "auto-execute", "executed", "failed", + "snoozed", "approval-acted", + } + + + def utc_now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + + def ensure_dirs(state_dir: Path | None = None): + s = Path(state_dir) if state_dir else STATE_DIR + (s / "queue").mkdir(parents=True, exist_ok=True) + (s / "decisions").mkdir(parents=True, exist_ok=True) + return s / "queue", s / "decisions", s / "audit.jsonl" + + + def append_audit(event, audit_log=None): + log = Path(audit_log) if audit_log else AUDIT_LOG + log.parent.mkdir(parents=True, exist_ok=True) + line = json.dumps(event, separators=(",", ":")) + "\\n" + fd = os.open(log, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644) + try: + os.write(fd, line.encode("utf-8")) + finally: + os.close(fd) + + + def read_audit(audit_log=None): + log = Path(audit_log) if audit_log else AUDIT_LOG + if not log.is_file(): + return [] + out = [] + for line in log.read_text().splitlines(): + line = line.strip() + if line: + out.append(json.loads(line)) + return out + ''' +) + + +@pytest.fixture +def workspace_with_scripts(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + """Create a synthetic ``scripts/`` layout the dispatcher can shell into. + + Drops a stub ``mechanical-action.sh`` that records its args + exits + with ``$MECHACTION_EXIT`` and a minimal ``_mechanical_action.py`` + audit helper. We deliberately avoid copying any out-of-tree file so + the test stays self-contained. + """ + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + + helper_path = scripts_dir / "_mechanical_action.py" + helper_path.write_text(_MINIMAL_HELPER_SRC) + + # A predictable stub: writes its args to a sibling log, returns the + # exit code embedded in $MECHACTION_EXIT (default 0). + log_path = tmp_path / "mechanical-action.log" + stub = textwrap.dedent(f"""\ + #!/usr/bin/env bash + echo "$@" >> "{log_path}" + exit ${{MECHACTION_EXIT:-0}} + """) + sh_path = scripts_dir / "mechanical-action.sh" + sh_path.write_text(stub) + sh_path.chmod(0o755) + + monkeypatch.setenv("NERVE_WORKSPACE_PATH", str(tmp_path)) + monkeypatch.setenv( + "NERVE_MECHANICAL_STATE_DIR", + str(tmp_path / ".nerve" / "mechanical-actions"), + ) + return tmp_path + + +def read_audit_jsonl(state_dir: Path) -> list[dict[str, Any]]: + """Read every record from the mechanical-actions audit log.""" + audit_log = state_dir / "audit.jsonl" + if not audit_log.is_file(): + return [] + out: list[dict[str, Any]] = [] + for line in audit_log.read_text().splitlines(): + line = line.strip() + if not line: + continue + out.append(json.loads(line)) + return out + + +# ---------------------------------------------------------------------- +# Schema / store +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestSchemaAndStore: + async def test_v028_columns_exist(self, db: Database): + async with db.db.execute("PRAGMA table_info(notifications)") as cur: + cols = {row[1] async for row in cur} + assert "target_kind" in cols + assert "target_id" in cols + + async def test_create_notification_default_target_columns_null(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", + type="notify", title="hello", + ) + notif = await db.get_notification("n1") + assert notif is not None + assert notif["target_kind"] is None + assert notif["target_id"] is None + + async def test_create_notification_with_target(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", + type="approval", title="approve me", + target_kind="mechanical-action", + target_id="20260519T143906Z-d2e62e", + ) + notif = await db.get_notification("n1") + assert notif["target_kind"] == "mechanical-action" + assert notif["target_id"] == "20260519T143906Z-d2e62e" + assert notif["type"] == "approval" + + async def test_snooze_notification_advances_expiry(self, db: Database): + await db.create_session("s1") + future = ( + datetime.now(timezone.utc) + timedelta(hours=1) + ).isoformat() + await db.create_notification( + notification_id="n1", session_id="s1", + type="approval", title="t", + expires_at=future, + ) + new_expiry = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + ok = await db.snooze_notification("n1", new_expiry) + assert ok is True + notif = await db.get_notification("n1") + assert notif["expires_at"] == new_expiry + assert notif["status"] == "pending" + + async def test_snooze_notification_rejects_non_pending(self, db: Database): + await db.create_session("s1") + await db.create_notification( + notification_id="n1", session_id="s1", type="approval", title="t", + ) + await db.answer_notification("n1", "approve", "web") + new_expiry = ( + datetime.now(timezone.utc) + timedelta(hours=24) + ).isoformat() + assert await db.snooze_notification("n1", new_expiry) is False + + +# ---------------------------------------------------------------------- +# propose_action +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestProposeAction: + async def test_propose_action_creates_approval_row( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="test-123", + title="approve fix-pack", + ) + notif = await db.get_notification(result["notification_id"]) + assert notif is not None + assert notif["type"] == "approval" + assert notif["target_kind"] == "mechanical-action" + assert notif["target_id"] == "test-123" + assert notif["priority"] == "high" + # Options stored as the canonical value list. + stored_opts = json.loads(notif["options"]) + assert stored_opts == ["approve", "decline", "snooze_24h"] + # option_labels live in metadata so the web side can render + # without re-parsing options. + meta = json.loads(notif["metadata"]) + assert meta["option_labels"]["approve"] == "Approve" + assert meta["option_labels"]["snooze_24h"] == "Snooze 24h" + assert meta["target_kind"] == "mechanical-action" + + async def test_propose_action_rejects_empty_options( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + with pytest.raises(ValueError): + await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="t", + title="t", + options=[], + ) + + async def test_propose_action_custom_options_round_trip( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="x", + title="custom", + options=[ + {"label": "Yes please", "value": "yes"}, + {"label": "No thanks", "value": "no"}, + ], + ) + notif = await db.get_notification(result["notification_id"]) + assert json.loads(notif["options"]) == ["yes", "no"] + meta = json.loads(notif["metadata"]) + assert meta["option_labels"] == { + "yes": "Yes please", "no": "No thanks", + } + + +# ---------------------------------------------------------------------- +# handle_answer dispatch path +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestHandleAnswerApproval: + async def test_approve_invokes_dispatcher_and_writes_audit( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-1", + title="run lint", + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "approve", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "approve" + + # Audit log: an ``approval-acted`` event arrived in the + # state-dir-scoped audit log. The minimal helper honors + # NERVE_MECHANICAL_STATE_DIR (set by the fixture) so each test + # writes to its own isolated audit.jsonl. + state_dir = ( + workspace_with_scripts / ".nerve" / "mechanical-actions" + ) + events = read_audit_jsonl(state_dir) + acted = [e for e in events if e.get("event") == "approval-acted"] + assert any( + e.get("notification_id") == nid + and e.get("decision") == "approve" + and e.get("ok") is True + for e in acted + ) + + # Broadcast fired with approval_status="answered". + approval_broadcasts = [ + m for _, m in patch_broadcaster + if m.get("type") == "notification_answered" + and m.get("notification_id") == nid + ] + assert approval_broadcasts + assert approval_broadcasts[0]["approval_status"] == "answered" + assert approval_broadcasts[0]["dispatch_ok"] is True + # Importantly, no ``answer_injected`` should fire. The answer + # routes through the dispatcher, not back into the session. + injected = [ + m for _, m in patch_broadcaster + if m.get("type") == "answer_injected" + ] + assert not injected + + async def test_snooze_keeps_pending_and_advances_expiry( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-2", + title="snooze me", + expiry_hours=2, + ) + nid = result["notification_id"] + + before = await db.get_notification(nid) + prior_expiry = before["expires_at"] + + ok = await svc.handle_answer(nid, "snooze_24h", "web") + assert ok is True + + after = await db.get_notification(nid) + assert after["status"] == "pending" + assert after["expires_at"] is not None + # Expiry advanced forward; sanity check it is not the original. + assert after["expires_at"] != prior_expiry + # And no answer recorded (snooze is not a final answer). + assert after["answer"] is None + + approval_broadcasts = [ + m for _, m in patch_broadcaster + if m.get("type") == "notification_answered" + and m.get("notification_id") == nid + ] + assert approval_broadcasts[0]["approval_status"] == "snoozed" + + async def test_decline_marks_answered_with_decline( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.propose_action( + session_id="s1", + target_kind="mechanical-action", + target_id="prop-3", + title="decline me", + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "decline", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "decline" + + async def test_unknown_target_kind_marks_answered_without_dispatch( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + workspace_with_scripts: Path, + ): + """If a row has a target_kind no dispatcher knows about, we still + flip the status so the row doesn't get re-delivered, and the + audit log records the no-dispatcher state. + """ + await db.create_session("s1") + await db.create_notification( + notification_id="orphan-1", + session_id="s1", + type="approval", + title="orphan", + target_kind="never-registered", + target_id="x", + ) + svc = NotificationService(fake_config, db, fake_engine) + ok = await svc.handle_answer("orphan-1", "approve", "web") + assert ok is True + notif = await db.get_notification("orphan-1") + assert notif["status"] == "answered" + + async def test_legacy_question_path_still_injects_answer( + self, + db: Database, + fake_config: NerveConfig, + fake_engine: MagicMock, + patch_broadcaster: list, + ): + """Type=question (no target_kind) must keep flowing through the + session-injection path, untouched by the approval dispatch. + """ + await db.create_session("s1") + svc = NotificationService(fake_config, db, fake_engine) + result = await svc.ask_question( + session_id="s1", + title="legacy", + body="ask me anything", + options=["yes", "no"], + ) + nid = result["notification_id"] + + ok = await svc.handle_answer(nid, "yes", "web") + assert ok is True + + notif = await db.get_notification(nid) + assert notif["status"] == "answered" + assert notif["answer"] == "yes" + + # Confirm we broadcast the session-scoped answer_injected event, + # AND queued a run on the engine (since the session is not + # currently running per the fake_engine fixture). + injected = [ + m for _, m in patch_broadcaster + if m.get("type") == "answer_injected" + and m.get("notification_id") == nid + ] + assert injected + # Wait for any fire-and-forget answer task to settle. + await asyncio.sleep(0) + fake_engine.run.assert_called() + + +# ---------------------------------------------------------------------- +# Handler registry sanity +# ---------------------------------------------------------------------- + + +class TestHandlerRegistry: + def test_mechanical_action_dispatcher_registered(self): + assert "mechanical-action" in _handlers.known_kinds() + + def test_default_approval_options(self): + opts = _handlers.default_approval_options() + values = {o["value"] for o in opts} + assert values == {"approve", "decline", "snooze_24h"} + + def test_dispatcher_rejects_unsupported_decision(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + # Even with a valid workspace, an unknown decision should fail + # cleanly with an audit_event marking the rejection. + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + (scripts_dir / "mechanical-action.sh").write_text("#!/usr/bin/env bash\nexit 0\n") + (scripts_dir / "mechanical-action.sh").chmod(0o755) + monkeypatch.setenv("NERVE_WORKSPACE_PATH", str(tmp_path)) + + result = _handlers._dispatch_mechanical_action( + {"id": "n-1"}, "x", "rubberstamp", None, + ) + assert result.ok is False + assert "unsupported decision" in result.audit_event.get("error", "") diff --git a/web/src/api/websocket.ts b/web/src/api/websocket.ts index 62e59cf..fd58fb2 100644 --- a/web/src/api/websocket.ts +++ b/web/src/api/websocket.ts @@ -19,8 +19,8 @@ export type WSMessage = | { type: 'subagent_start'; session_id: string; tool_use_id: string; subagent_type: string; description: string; model?: string } | { type: 'subagent_complete'; session_id: string; tool_use_id: string; duration_ms: number; is_error?: boolean } | { type: 'file_changed'; session_id: string; path: string; operation: string; tool_use_id: string } - | { type: 'notification'; notification_id: string; notification_type: 'notify' | 'question'; session_id: string; title: string; body: string; priority: string; options: string[] | null } - | { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string } + | { type: 'notification'; notification_id: string; notification_type: 'notify' | 'question' | 'approval'; session_id: string; title: string; body: string; priority: string; options: string[] | null; option_labels?: Record; target_kind?: string; target_id?: string } + | { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string; approval_status?: 'answered' | 'snoozed'; dispatch_ok?: boolean } | { type: 'answer_injected'; session_id: string; notification_id: string; title: string; answer: string; answered_by: string; content: string } | { type: 'session_running'; session_id: string; is_running: boolean } | { type: 'background_tasks_update'; session_id: string; tasks: { task_id: string; label: string; tool: string; status: 'running' | 'done' | 'timeout' }[] } diff --git a/web/src/components/Notifications/NotificationToast.tsx b/web/src/components/Notifications/NotificationToast.tsx index 5bb05e6..1f6c0fe 100644 --- a/web/src/components/Notifications/NotificationToast.tsx +++ b/web/src/components/Notifications/NotificationToast.tsx @@ -1,10 +1,28 @@ import { useEffect } from 'react'; import { useNavigate } from 'react-router-dom'; -import { Bell, HelpCircle, X } from 'lucide-react'; -import { useNotificationStore } from '../../stores/notificationStore'; +import { Bell, HelpCircle, ShieldCheck, X } from 'lucide-react'; +import { useNotificationStore, type Notification } from '../../stores/notificationStore'; const TOAST_DURATION = 5000; +const APPROVAL_QUICK_BUTTON_STYLES: Record = { + approve: 'bg-emerald-400/15 text-hue-emerald border-emerald-400/30 hover:bg-emerald-400/25', + decline: 'bg-red-400/15 text-hue-red border-red-400/30 hover:bg-red-400/25', + snooze_24h: 'bg-border-subtle/40 text-text-muted border-border-subtle hover:bg-border-subtle/60', +}; + +const APPROVAL_QUICK_LABELS: Record = { + approve: '✅ Approve', + decline: '❌ Decline', + snooze_24h: '💤 Snooze', +}; + +function quickLabel(value: string, notif: Notification): string { + const labels = notif.option_labels; + if (labels && labels[value]) return labels[value]; + return APPROVAL_QUICK_LABELS[value] || value; +} + export function NotificationToast() { const { toastQueue, dismissToast, answerNotification } = useNotificationStore(); const navigate = useNavigate(); @@ -27,6 +45,7 @@ export function NotificationToast() {
{visible.map((notif) => { const isQuestion = notif.type === 'question'; + const isApproval = notif.type === 'approval'; const options = notif.options ? (typeof notif.options === 'string' ? JSON.parse(notif.options) : notif.options) : null; return ( @@ -35,7 +54,9 @@ export function NotificationToast() { className="bg-surface-raised border border-border-subtle rounded-lg shadow-xl p-3 animate-slide-in" >
- {isQuestion ? ( + {isApproval ? ( + + ) : isQuestion ? ( ) : ( @@ -78,6 +99,26 @@ export function NotificationToast() { ))}
)} + {/* Quick action buttons for approvals */} + {isApproval && options && notif.status === 'pending' && ( +
+ {options.slice(0, 3).map((value: string) => { + const style = APPROVAL_QUICK_BUTTON_STYLES[value] || 'bg-accent/15 text-accent border-accent/30 hover:bg-accent/25'; + return ( + + ); + })} +
+ )}
diff --git a/web/src/pages/NotificationsPage.tsx b/web/src/pages/NotificationsPage.tsx index c4bec7e..0c32e73 100644 --- a/web/src/pages/NotificationsPage.tsx +++ b/web/src/pages/NotificationsPage.tsx @@ -1,6 +1,6 @@ import { useEffect, useState } from 'react'; import { useNavigate } from 'react-router-dom'; -import { Bell, X, CheckCheck, EyeOff } from 'lucide-react'; +import { Bell, X, CheckCheck, EyeOff, Check, XCircle, Moon } from 'lucide-react'; import { useNotificationStore, type Notification } from '../stores/notificationStore'; const STATUS_STYLES: Record = { @@ -17,6 +17,12 @@ const PRIORITY_DOTS: Record = { low: '', }; +const TYPE_BADGE_STYLES: Record = { + question: 'bg-blue-400/10 text-hue-blue border-blue-400/20', + approval: 'bg-violet-400/10 text-hue-violet border-violet-400/20', + notify: 'bg-border-subtle/50 text-text-muted border-border-subtle', +}; + const STATUS_FILTERS = [ { label: 'All', value: '' }, { label: 'Pending', value: 'pending' }, @@ -28,8 +34,63 @@ const TYPE_FILTERS = [ { label: 'All', value: '' }, { label: 'Notifications', value: 'notify' }, { label: 'Questions', value: 'question' }, + { label: 'Approvals', value: 'approval' }, ]; +// Approval-kind button styling. Keyed by the option ``value`` the +// dispatcher receives, not the human label, so the styling stays +// stable even when labels are renamed. +const APPROVAL_BUTTON_STYLES: Record = { + approve: + 'bg-emerald-400/15 text-hue-emerald border-emerald-400/30 hover:bg-emerald-400/25', + decline: + 'bg-red-400/15 text-hue-red border-red-400/30 hover:bg-red-400/25', + snooze_24h: + 'bg-border-subtle/40 text-text-muted border-border-subtle hover:bg-border-subtle/60', +}; + +const APPROVAL_DEFAULT_BUTTON_STYLE = + 'bg-accent/15 text-accent border-accent/30 hover:bg-accent/25'; + +const APPROVAL_BUTTON_ICONS: Record = { + approve: Check, + decline: XCircle, + snooze_24h: Moon, +}; + +const APPROVAL_DEFAULT_LABELS: Record = { + approve: 'Approve', + decline: 'Decline', + snooze_24h: 'Snooze 24h', +}; + +function approvalLabel(value: string, labels: Record | null | undefined): string { + if (labels && labels[value]) return labels[value]; + if (APPROVAL_DEFAULT_LABELS[value]) return APPROVAL_DEFAULT_LABELS[value]; + // Fall back to value with underscores replaced and title cased. + return value + .split('_') + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(' '); +} + +function parseOptionLabels(notif: Notification): Record | null { + if (notif.option_labels) return notif.option_labels; + if (!notif.metadata) return null; + try { + const meta = typeof notif.metadata === 'string' ? JSON.parse(notif.metadata) : notif.metadata; + if (meta && typeof meta === 'object' && 'option_labels' in meta) { + const labels = (meta as Record).option_labels; + if (labels && typeof labels === 'object') { + return labels as Record; + } + } + } catch { + // Malformed JSON: just fall back to defaults. + } + return null; +} + function FreeTextInput({ onSubmit }: { onSubmit: (text: string) => void }) { const [text, setText] = useState(''); const [open, setOpen] = useState(false); @@ -90,6 +151,8 @@ function NotificationCard({ notif }: { notif: Notification }) { const { answerNotification, dismissNotification } = useNotificationStore(); const priorityDot = PRIORITY_DOTS[notif.priority]; const options = notif.options ? (typeof notif.options === 'string' ? JSON.parse(notif.options) : notif.options) : null; + const isApproval = notif.type === 'approval'; + const optionLabels = isApproval ? parseOptionLabels(notif) : null; return (
{notif.body}

)} + {isApproval && notif.target_kind && notif.target_id && ( +

+ {notif.target_kind}: {notif.target_id} +

+ )}
{notif.status} - + {notif.type}
@@ -151,10 +219,32 @@ function NotificationCard({ notif }: { notif: Notification }) { )} + {/* Action UI for pending approvals */} + {isApproval && notif.status === 'pending' && ( +
+ {options?.map((value: string) => { + const Icon = APPROVAL_BUTTON_ICONS[value]; + const buttonStyle = APPROVAL_BUTTON_STYLES[value] || APPROVAL_DEFAULT_BUTTON_STYLE; + const label = approvalLabel(value, optionLabels); + return ( + + ); + })} +
+ )} + {/* Show answer if answered */} {notif.status === 'answered' && (
- Answer: {notif.answer} (via {notif.answered_by}) + Answer: {isApproval ? approvalLabel(notif.answer || '', optionLabels) : notif.answer}{' '} + (via {notif.answered_by})
)} diff --git a/web/src/stores/notificationStore.ts b/web/src/stores/notificationStore.ts index b639a71..9a1a56b 100644 --- a/web/src/stores/notificationStore.ts +++ b/web/src/stores/notificationStore.ts @@ -5,7 +5,7 @@ export interface Notification { id: string; session_id: string; session_title: string | null; - type: 'notify' | 'question'; + type: 'notify' | 'question' | 'approval'; title: string; body: string; priority: string; @@ -16,6 +16,12 @@ export interface Notification { answered_at: string | null; created_at: string; expires_at: string | null; + target_kind?: string | null; + target_id?: string | null; + // Optional label map for approval-kind rows: value -> human label. + // Sent on the WS notification payload and stored on the row metadata. + option_labels?: Record | null; + metadata?: string | Record | null; } interface NotificationState { @@ -113,6 +119,9 @@ export const useNotificationStore = create((set, get) => ({ answered_at: null, created_at: new Date().toISOString(), expires_at: null, + target_kind: data.target_kind ?? null, + target_id: data.target_id ?? null, + option_labels: data.option_labels ?? null, }; set(s => ({