diff --git a/cycode/cli/apps/ai_guardrails/hooks_manager.py b/cycode/cli/apps/ai_guardrails/hooks_manager.py index 1fe23bb2..192bb9f3 100644 --- a/cycode/cli/apps/ai_guardrails/hooks_manager.py +++ b/cycode/cli/apps/ai_guardrails/hooks_manager.py @@ -28,7 +28,7 @@ def _is_cycode_command(command: str) -> bool: def is_cycode_hook_entry(entry: dict) -> bool: - """Detect Cycode hook entries in both Cursor (flat) and Claude Code (nested) shapes.""" + """True if any hook inside ``entry`` is owned by Cycode.""" command = entry.get('command', '') if _is_cycode_command(command): return True @@ -40,6 +40,31 @@ def is_cycode_hook_entry(entry: dict) -> bool: return False +def _strip_cycode_from_entry(entry: dict) -> Optional[dict]: + """Remove Cycode hooks from ``entry`` and return the remainder. + + Returns ``None`` when nothing useful remains (Cursor-flat Cycode entry, or + every nested hook was Cycode). Non-Cycode hooks co-located in the same + entry are preserved. + """ + # Cursor format: the entry itself IS a single hook command. + if 'command' in entry and 'hooks' not in entry: + return None if _is_cycode_command(entry.get('command', '')) else entry + + # Claude Code / Codex format: nested `hooks` list inside the entry. + nested = entry.get('hooks') + if isinstance(nested, list): + kept = [h for h in nested if not (isinstance(h, dict) and _is_cycode_command(h.get('command', '')))] + if not kept: + return None + if len(kept) == len(nested): + return entry # nothing Cycode-shaped inside; preserve identity + return {**entry, 'hooks': kept} + + # Entry has neither shape we recognize — leave it alone defensively. + return entry + + def _load_hooks_file(hooks_path: Path) -> Optional[dict]: if not hooks_path.exists(): return None @@ -108,50 +133,83 @@ def install_hooks( for event, entries in rendered['hooks'].items(): existing['hooks'].setdefault(event, []) - - # Remove any existing Cycode entries for this event - existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)] - - # Add new Cycode entries + existing['hooks'][event] = [ + stripped for e in existing['hooks'][event] if (stripped := _strip_cycode_from_entry(e)) is not None + ] for entry in entries: existing['hooks'][event].append(entry) - if _save_hooks_file(hooks_path, existing): - return True, f'AI guardrails hooks installed: {hooks_path}' - return False, f'Failed to install hooks to {hooks_path}' + if not _save_hooks_file(hooks_path, existing): + return False, f'Failed to install hooks to {hooks_path}' + message = f'AI guardrails hooks installed: {hooks_path}' -def uninstall_hooks(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]: - """Remove Cycode AI guardrails hooks for ``ide``.""" - hooks_path = ide.settings_path(scope, repo_path) + # IDE-specific extras (e.g. Codex enables a TOML feature flag). + extra_ok, extra_message = ide.post_install(scope, repo_path) + if not extra_ok: + return False, extra_message + if extra_message: + message = f'{message}\n {extra_message}' - existing = _load_hooks_file(hooks_path) - if existing is None: - return True, f'No hooks file found at {hooks_path}' + return True, message + +def _strip_cycode_entries(existing: dict) -> bool: + """Mutate ``existing`` to drop Cycode hooks (surgically). Return True if anything changed.""" modified = False for event in list(existing.get('hooks', {}).keys()): - original_count = len(existing['hooks'][event]) - existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)] - if len(existing['hooks'][event]) != original_count: - modified = True - if not existing['hooks'][event]: + before = existing['hooks'][event] + after: list = [] + for e in before: + stripped = _strip_cycode_from_entry(e) + if stripped is None: + modified = True + continue + if stripped is not e: + modified = True + after.append(stripped) + if not after: del existing['hooks'][event] + else: + existing['hooks'][event] = after + return modified + +def _persist_uninstall(hooks_path: Path, existing: dict, modified: bool) -> tuple[bool, str]: + """Apply the uninstall result to disk and return ``(success, message)``.""" if not modified: return True, 'No Cycode hooks found to remove' - if not existing.get('hooks'): try: hooks_path.unlink() - return True, f'Removed hooks file: {hooks_path}' except Exception as e: logger.debug('Failed to delete hooks file', exc_info=e) return False, f'Failed to remove hooks file: {hooks_path}' + return True, f'Removed hooks file: {hooks_path}' + if not _save_hooks_file(hooks_path, existing): + return False, f'Failed to update hooks file: {hooks_path}' + return True, f'Cycode hooks removed from: {hooks_path}' + + +def uninstall_hooks(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Remove Cycode AI guardrails hooks for ``ide``.""" + hooks_path = ide.settings_path(scope, repo_path) + + existing = _load_hooks_file(hooks_path) + if existing is None: + return True, f'No hooks file found at {hooks_path}' - if _save_hooks_file(hooks_path, existing): - return True, f'Cycode hooks removed from: {hooks_path}' - return False, f'Failed to update hooks file: {hooks_path}' + modified = _strip_cycode_entries(existing) + file_ok, message = _persist_uninstall(hooks_path, existing, modified) + if not file_ok: + return False, message + + extra_ok, extra_message = ide.post_uninstall(scope, repo_path) + if not extra_ok: + return False, extra_message + if extra_message: + message = f'{message}\n {extra_message}' + return True, message def get_hooks_status(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> dict: diff --git a/cycode/cli/apps/ai_guardrails/ides/__init__.py b/cycode/cli/apps/ai_guardrails/ides/__init__.py index 92859701..e598c5a5 100644 --- a/cycode/cli/apps/ai_guardrails/ides/__init__.py +++ b/cycode/cli/apps/ai_guardrails/ides/__init__.py @@ -9,11 +9,12 @@ from cycode.cli.apps.ai_guardrails.ides.base import IDE from cycode.cli.apps.ai_guardrails.ides.claude_code import ClaudeCode +from cycode.cli.apps.ai_guardrails.ides.codex import Codex from cycode.cli.apps.ai_guardrails.ides.cursor import Cursor # Single source of truth: name → singleton instance. # `--ide` choices and install/uninstall/status iteration both derive from this. -IDES: dict[str, IDE] = {ide.name: ide for ide in (Cursor(), ClaudeCode())} +IDES: dict[str, IDE] = {ide.name: ide for ide in (Cursor(), ClaudeCode(), Codex())} # Default IDE used when `--ide` is omitted. Kept here so the value is colocated # with the registry; no module outside `ides/` needs to know which IDE wins. diff --git a/cycode/cli/apps/ai_guardrails/ides/_plugin_utils.py b/cycode/cli/apps/ai_guardrails/ides/_plugin_utils.py new file mode 100644 index 00000000..186dd37f --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/ides/_plugin_utils.py @@ -0,0 +1,73 @@ +"""Shared plugin-resolution helpers for IDE integrations. + +Both Claude Code and Codex use the same ``@`` key convention +and emit the same telemetry shape — only the marketplace layout and manifest +location differ. ``walk_enabled_plugins`` is the IDE-agnostic loop; each IDE +supplies the two callables that vary (``locate_dir`` + ``read_plugin``). +""" + +import json +from pathlib import Path +from typing import Any, Callable, Optional + +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails Plugins') + + +def load_plugin_json(path: Path) -> Optional[dict]: + """Load a JSON file inside a plugin directory; None if missing or invalid.""" + if not path.exists(): + return None + try: + return json.loads(path.read_text(encoding='utf-8')) + except Exception as e: + logger.debug('Failed to load plugin file, %s', {'path': str(path)}, exc_info=e) + return None + + +def walk_enabled_plugins( + plugin_entries: dict[str, Any], + is_enabled: Callable[[Any], bool], + locate_dir: Callable[[str, str], Optional[Path]], + read_plugin: Callable[[Path], tuple[dict, dict]], +) -> tuple[dict, dict]: + """Iterate enabled plugins; merge their MCP servers and metadata. + + Args: + plugin_entries: ``{@: settings}`` map from the IDE config. + is_enabled: returns True if ``settings`` indicates the plugin is on + (e.g. ``bool(settings)`` for Claude, ``settings.get('enabled')`` for Codex). + locate_dir: given ``(plugin_name, marketplace)``, returns the plugin's + filesystem path or None if it can't be resolved. + read_plugin: given the plugin path, returns ``(entry_fields, servers)``: + ``entry_fields`` are extra metadata to attach to the inventory entry + (name/version/description/...), ``servers`` are MCP servers contributed. + + Returns ``(merged_mcp_servers, enriched_plugins)``. Plugin keys without + ``@`` (or that fail to resolve to a directory) still appear in the + inventory with just ``{'enabled': True}`` so we don't silently drop them. + """ + merged_mcp: dict = {} + enriched: dict = {} + + for plugin_key, settings in plugin_entries.items(): + if not is_enabled(settings): + continue + + entry: dict = {'enabled': True} + enriched[plugin_key] = entry + + if '@' not in plugin_key: + continue + plugin_name, marketplace = plugin_key.split('@', 1) + + plugin_dir = locate_dir(plugin_name, marketplace) + if plugin_dir is None: + continue + + plugin_fields, servers = read_plugin(plugin_dir) + entry.update(plugin_fields) + merged_mcp.update(servers) + + return merged_mcp, enriched diff --git a/cycode/cli/apps/ai_guardrails/ides/base.py b/cycode/cli/apps/ai_guardrails/ides/base.py index 55d5fb05..92065590 100644 --- a/cycode/cli/apps/ai_guardrails/ides/base.py +++ b/cycode/cli/apps/ai_guardrails/ides/base.py @@ -108,6 +108,26 @@ def render_hooks_config(self, async_mode: bool = False) -> dict: ``hooks_manager`` can treat them uniformly. """ + def post_install(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Run IDE-specific actions after the hooks file is written. + + Default: no-op success. Override to perform extra setup that doesn't + belong in the hooks file itself — e.g. Codex enables a + ``[features] codex_hooks = true`` flag in its TOML config. + + Returns ``(success, message)``. If ``success`` is False, the overall + install is considered failed. + """ + return True, '' + + def post_uninstall(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Run IDE-specific cleanup after the hooks file is removed. + + Default: no-op success. Override to undo whatever ``post_install`` + wrote outside the hooks file. + """ + return True, '' + # --- runtime scan --- @abstractmethod diff --git a/cycode/cli/apps/ai_guardrails/ides/claude_code.py b/cycode/cli/apps/ai_guardrails/ides/claude_code.py index 519914b9..4178ec56 100644 --- a/cycode/cli/apps/ai_guardrails/ides/claude_code.py +++ b/cycode/cli/apps/ai_guardrails/ides/claude_code.py @@ -7,6 +7,7 @@ from typing import ClassVar, Optional from cycode.cli.apps.ai_guardrails.consts import CYCODE_SCAN_PROMPT_COMMAND, CYCODE_SESSION_START_COMMAND +from cycode.cli.apps.ai_guardrails.ides._plugin_utils import load_plugin_json, walk_enabled_plugins from cycode.cli.apps.ai_guardrails.ides.base import IDE, DecisionAction, HookDecision from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType @@ -127,7 +128,7 @@ def load_claude_config(config_path: Optional[Path] = None) -> Optional[dict]: """Load and parse `~/.claude.json`. Returns None if missing/invalid.""" path = config_path or _CLAUDE_CONFIG_PATH if not path.exists(): - logger.debug('Claude config file not found', extra={'path': str(path)}) + logger.debug('Claude config file not found, %s', {'path': str(path)}) return None try: return json.loads(path.read_text(encoding='utf-8')) @@ -150,7 +151,7 @@ def load_claude_settings(settings_path: Optional[Path] = None) -> Optional[dict] """Load and parse `~/.claude/settings.json`. Returns None if missing/invalid.""" path = settings_path or _CLAUDE_SETTINGS_PATH if not path.exists(): - logger.debug('Claude settings file not found', extra={'path': str(path)}) + logger.debug('Claude settings file not found, %s', {'path': str(path)}) return None try: return json.loads(path.read_text(encoding='utf-8')) @@ -171,69 +172,47 @@ def _resolve_marketplace_path(marketplace: dict) -> Optional[Path]: return path if path.is_dir() else None -def _load_plugin_json_file(plugin_path: Path, relative_path: str) -> Optional[dict]: - """Load and parse a JSON file inside a plugin directory. +def _read_claude_plugin(plugin_dir: Path) -> tuple[dict, dict]: + """Read one Claude Code plugin's manifest + MCP servers. - Returns None if the file is missing, unreadable, or has invalid JSON. + Claude hardcodes the MCP file at ``/.mcp.json`` and always + wraps it as ``{"mcpServers": {...}}``. """ - target = plugin_path / relative_path - if not target.exists(): - return None - try: - return json.loads(target.read_text(encoding='utf-8')) - except Exception as e: - logger.debug('Failed to load plugin file', extra={'path': str(target)}, exc_info=e) - return None + manifest = load_plugin_json(plugin_dir / '.claude-plugin' / 'plugin.json') or {} + entry: dict = {} + for field in ('name', 'version', 'description'): + if field in manifest: + entry[field] = manifest[field] + mcp_config = load_plugin_json(plugin_dir / '.mcp.json') or {} + servers: dict = mcp_config.get('mcpServers') or {} + if servers: + entry['mcp_server_names'] = list(servers.keys()) + return entry, servers -def resolve_plugins(settings: dict) -> tuple[dict, dict]: - """Resolve enabled plugins to their MCP servers and metadata. - Walks ``enabledPlugins`` from claude settings, resolves each plugin's - marketplace directory via ``extraKnownMarketplaces``, and reads: - - ``/.mcp.json`` for MCP servers (merged into a flat dict) - - ``/.claude-plugin/plugin.json`` for metadata (name, version, description) +def resolve_plugins(settings: dict) -> tuple[dict, dict]: + """Walk Claude Code's ``enabledPlugins`` via the shared plugin walker. - Returns ``(merged_mcp_servers, enriched_plugins)``. + Each enabled plugin's marketplace is resolved through + ``extraKnownMarketplaces`` to a directory; the rest of the work + (manifest + ``.mcp.json``) is the shared ``_read_claude_plugin``. """ enabled = settings.get('enabledPlugins') or {} marketplaces = settings.get('extraKnownMarketplaces') or {} - merged_mcp: dict = {} - enriched: dict = {} - for plugin_key, is_enabled in enabled.items(): - if not is_enabled: - continue - - entry: dict = {'enabled': True} - enriched[plugin_key] = entry - - if '@' not in plugin_key: - continue - - _plugin_name, marketplace_name = plugin_key.split('@', 1) + def _locate(_plugin_name: str, marketplace_name: str) -> Optional[Path]: marketplace = marketplaces.get(marketplace_name) if not marketplace: - continue - - plugin_path = _resolve_marketplace_path(marketplace) - if plugin_path is None: - continue - - metadata = _load_plugin_json_file(plugin_path, '.claude-plugin/plugin.json') or {} - for field in ('name', 'version', 'description'): - if field in metadata: - entry[field] = metadata[field] - - mcp_config = _load_plugin_json_file(plugin_path, '.mcp.json') or {} - plugin_server_names = [] - for server_name, server_cfg in (mcp_config.get('mcpServers') or {}).items(): - merged_mcp[server_name] = server_cfg - plugin_server_names.append(server_name) - if plugin_server_names: - entry['mcp_server_names'] = plugin_server_names + return None + return _resolve_marketplace_path(marketplace) - return merged_mcp, enriched + return walk_enabled_plugins( + plugin_entries=enabled, + is_enabled=bool, + locate_dir=_locate, + read_plugin=_read_claude_plugin, + ) # --- IDE integration ---------------------------------------------------------- @@ -260,6 +239,7 @@ def render_hooks_config(self, async_mode: bool = False) -> dict: 'hooks': { 'SessionStart': [ { + 'matcher': 'startup|clear', 'hooks': [{'type': 'command', 'command': _SESSION_START_COMMAND}], } ], diff --git a/cycode/cli/apps/ai_guardrails/ides/codex.py b/cycode/cli/apps/ai_guardrails/ides/codex.py new file mode 100644 index 00000000..f8c9b04d --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/ides/codex.py @@ -0,0 +1,310 @@ +"""Codex CLI IDE integration for AI guardrails.""" + +import json +import os +import sys +from pathlib import Path +from typing import ClassVar, Optional + +import tomli_w + +if sys.version_info >= (3, 11): + import tomllib +else: # pragma: no cover - py<3.11 fallback + import tomli as tomllib + +from cycode.cli.apps.ai_guardrails.consts import CYCODE_SCAN_PROMPT_COMMAND, CYCODE_SESSION_START_COMMAND +from cycode.cli.apps.ai_guardrails.ides._plugin_utils import load_plugin_json, walk_enabled_plugins +from cycode.cli.apps.ai_guardrails.ides.base import IDE, DecisionAction, HookDecision +from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload +from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType +from cycode.cli.utils.jwt_utils import decode_jwt_unverified +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails Codex') + +_CONFIG_DIR_NAME = '.codex' +_HOOKS_FILE_NAME = 'hooks.json' +_CONFIG_TOML_NAME = 'config.toml' +_AUTH_JSON_NAME = 'auth.json' +_CODEX_HOME_ENV_VAR = 'CODEX_HOME' + +_HOOK_EVENTS = ('UserPromptSubmit', 'PreToolUse:mcp') +_CODEX_EVENT_NAMES = frozenset(e.split(':', 1)[0] for e in _HOOK_EVENTS) + +_SCAN_COMMAND = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide codex' +_SESSION_START_COMMAND = f'{CYCODE_SESSION_START_COMMAND} --ide codex' + + +def _codex_home() -> Path: + """Resolve Codex's user-scope home directory. + + Honors ``$CODEX_HOME`` per Codex's documented override; falls back to + ``~/.codex``. + """ + override = os.environ.get(_CODEX_HOME_ENV_VAR) + if override: + return Path(override) + return Path.home() / _CONFIG_DIR_NAME + + +def _codex_config_toml_path(scope: str, repo_path: Optional[Path] = None) -> Path: + """Return the Codex ``config.toml`` path for the given scope.""" + if scope == 'repo' and repo_path: + return repo_path / _CONFIG_DIR_NAME / _CONFIG_TOML_NAME + return _codex_home() / _CONFIG_TOML_NAME + + +def _load_codex_config(config_path: Optional[Path] = None) -> Optional[dict]: + """Load and parse Codex's ``config.toml``. Returns None on missing/invalid.""" + path = config_path or (_codex_home() / _CONFIG_TOML_NAME) + if not path.exists(): + logger.debug('Codex config file not found, %s', {'path': str(path)}) + return None + try: + with path.open('rb') as f: + return tomllib.load(f) + except Exception as e: + logger.debug('Failed to load Codex config file, %s', {'path': str(path)}, exc_info=e) + return None + + +def _email_from_auth(auth_path: Optional[Path] = None) -> Optional[str]: + """Best-effort extraction of the signed-in Codex user's email. + + Reads ``~/.codex/auth.json`` and decodes the JWT in ``tokens.id_token`` + to pull the ``email`` claim. Returns None if auth.json is missing + (``OPENAI_API_KEY``-only setups, OS keychain credentials) or unreadable. + """ + path = auth_path or (_codex_home() / _AUTH_JSON_NAME) + if not path.exists(): + logger.debug('Codex auth file not found, %s', {'path': str(path)}) + return None + try: + auth = json.loads(path.read_text(encoding='utf-8')) + except (OSError, json.JSONDecodeError) as e: + logger.debug('Failed to load Codex auth file, %s', {'path': str(path)}, exc_info=e) + return None + + token = (auth.get('tokens') or {}).get('id_token') + if not token: + return None + claims = decode_jwt_unverified(token) + if not claims: + return None + return claims.get('email') + + +def _resolve_codex_plugin_dir(plugin_name: str, marketplace: str) -> Optional[Path]: + """Find ``~/.codex/plugins/cache////``. + + The trailing segment is a content hash. If multiple are cached, pick the + most recently modified. + """ + base = _codex_home() / 'plugins' / 'cache' / marketplace / plugin_name + if not base.is_dir(): + return None + candidates = [d for d in base.iterdir() if d.is_dir()] + if not candidates: + return None + return max(candidates, key=lambda d: d.stat().st_mtime) + + +def _read_codex_plugin(plugin_dir: Path) -> tuple[dict, dict]: + """Read one Codex plugin's manifest + MCP servers. + + Codex's manifest references the MCP file via a path string in the + ``mcpServers`` field (default ``./.mcp.json``); the target file is either + a bare ``{name: cfg}`` map or wrapped in ``{"mcpServers": {...}}``. + """ + manifest = load_plugin_json(plugin_dir / '.codex-plugin' / 'plugin.json') + entry: dict = {} + if not manifest: + return entry, {} + + for field in ('name', 'version', 'description'): + if field in manifest: + entry[field] = manifest[field] + + mcp_ref = manifest.get('mcpServers') + if not mcp_ref: + return entry, {} + mcp_doc = load_plugin_json(plugin_dir / mcp_ref) or {} + servers = mcp_doc.get('mcpServers', mcp_doc) + if not isinstance(servers, dict): + servers = {} + if servers: + entry['mcp_server_names'] = list(servers.keys()) + return entry, servers + + +def _resolve_codex_plugins(config: dict) -> tuple[dict, dict]: + """Walk enabled ``[plugins."@"]`` entries.""" + return walk_enabled_plugins( + plugin_entries=config.get('plugins') or {}, + is_enabled=lambda s: isinstance(s, dict) and bool(s.get('enabled')), + locate_dir=_resolve_codex_plugin_dir, + read_plugin=_read_codex_plugin, + ) + + +def _enable_codex_hooks_feature(scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]: + """Set ``[features] hooks = true`` in Codex's ``config.toml``. + + Codex's hook scripts are gated behind this feature flag. We preserve any + existing keys and create the file (+ parent dir) when missing. + """ + config_path = _codex_config_toml_path(scope, repo_path) + + config: dict = {} + if config_path.exists(): + try: + with config_path.open('rb') as f: + config = tomllib.load(f) + except Exception as e: + logger.error('Failed to parse Codex config.toml, %s', {'path': str(config_path)}, exc_info=e) + return False, f'Failed to parse existing Codex config at {config_path}' + + features = config.get('features') + if not isinstance(features, dict): + features = {} + features['hooks'] = True + config['features'] = features + + try: + config_path.parent.mkdir(parents=True, exist_ok=True) + with config_path.open('wb') as f: + tomli_w.dump(config, f) + return True, f'Enabled hooks feature in {config_path}' + except Exception as e: + logger.error('Failed to write Codex config.toml, %s', {'path': str(config_path)}, exc_info=e) + return False, f'Failed to write Codex config at {config_path}' + + +class Codex(IDE): + name: ClassVar[str] = 'codex' + display_name: ClassVar[str] = 'Codex' + hook_events: ClassVar[list[str]] = list(_HOOK_EVENTS) + + def settings_path(self, scope: str, repo_path: Optional[Path] = None) -> Path: + if scope == 'repo' and repo_path: + return repo_path / _CONFIG_DIR_NAME / _HOOKS_FILE_NAME + return _codex_home() / _HOOKS_FILE_NAME + + def render_hooks_config(self, async_mode: bool = False) -> dict: + # Codex's TOML `async: true` flag is unimplemented; shell-background via + # `&` is the working mechanism. SessionStart stays sync so the + # conversation context is registered before any scan hook fires. + bg = ' &' if async_mode else '' + scan_cmd = f'{_SCAN_COMMAND}{bg}' + return { + 'hooks': { + 'SessionStart': [ + { + 'matcher': 'startup|clear', + 'hooks': [{'type': 'command', 'command': _SESSION_START_COMMAND}], + } + ], + 'UserPromptSubmit': [ + { + 'hooks': [{'type': 'command', 'command': scan_cmd}], + } + ], + 'PreToolUse': [ + { + 'matcher': 'mcp__.*', + 'hooks': [{'type': 'command', 'command': scan_cmd}], + }, + ], + }, + } + + def post_install(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]: + return _enable_codex_hooks_feature(scope, repo_path) + + def matches_payload(self, raw_payload: dict) -> bool: + return raw_payload.get('hook_event_name', '') in _CODEX_EVENT_NAMES + + def parse_hook_payload(self, raw_payload: dict) -> AIHookPayload: + hook_event_name = raw_payload.get('hook_event_name', '') + tool_name = raw_payload.get('tool_name', '') + tool_input = raw_payload.get('tool_input') + + if hook_event_name == 'UserPromptSubmit': + canonical_event: AiHookEventType | str = AiHookEventType.PROMPT + elif hook_event_name == 'PreToolUse' and tool_name.startswith('mcp__'): + canonical_event = AiHookEventType.MCP_EXECUTION + else: + canonical_event = hook_event_name + + mcp_server_name = None + mcp_tool_name = None + mcp_arguments = None + if tool_name.startswith('mcp__'): + parts = tool_name.split('__') + if len(parts) >= 2: + mcp_server_name = parts[1] + if len(parts) >= 3: + mcp_tool_name = parts[2] + mcp_arguments = tool_input + + return AIHookPayload( + event_name=canonical_event, + conversation_id=raw_payload.get('session_id'), + generation_id=raw_payload.get('turn_id'), + ide_user_email=_email_from_auth(), + model=raw_payload.get('model'), + ide_provider=self.name, + prompt=raw_payload.get('prompt', ''), + mcp_server_name=mcp_server_name, + mcp_tool_name=mcp_tool_name, + mcp_arguments=mcp_arguments, + ) + + def build_hook_response(self, decision: HookDecision) -> dict: + # Codex accepts the same hook response shapes as Claude Code: + # - PROMPT: empty for allow, {"decision": "block", "reason": ...} for deny + # - PreToolUse: hookSpecificOutput.permissionDecision + if decision.event_type == AiHookEventType.PROMPT: + if decision.action == DecisionAction.ALLOW: + return {} + return {'decision': 'block', 'reason': decision.user_message or ''} + + if decision.action == DecisionAction.ALLOW: + return { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'allow', + } + } + return { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': decision.action.value, # 'deny' or 'ask' + 'permissionDecisionReason': decision.user_message or '', + } + } + + def build_session_payload(self, raw_payload: dict) -> AIHookPayload: + return AIHookPayload( + conversation_id=raw_payload.get('session_id'), + ide_user_email=_email_from_auth(), + model=raw_payload.get('model'), + ide_provider=self.name, + ide_version=raw_payload.get('codex_version'), + source=raw_payload.get('source'), + ) + + def get_user_email(self) -> Optional[str]: + return _email_from_auth() + + def get_session_context(self) -> tuple[dict, dict]: + config = _load_codex_config() + if not config: + return {}, {} + # Codex stores MCP servers under `[mcp_servers.]`. Plugin-contributed + # servers (via `[plugins."@"]`) merge on top. + mcp_servers: dict = dict(config.get('mcp_servers') or {}) + plugin_mcp, enriched_plugins = _resolve_codex_plugins(config) + mcp_servers.update(plugin_mcp) + return mcp_servers, enriched_plugins diff --git a/cycode/cli/apps/ai_guardrails/ides/cursor.py b/cycode/cli/apps/ai_guardrails/ides/cursor.py index 4f6be1eb..aa218542 100644 --- a/cycode/cli/apps/ai_guardrails/ides/cursor.py +++ b/cycode/cli/apps/ai_guardrails/ides/cursor.py @@ -43,7 +43,7 @@ def _load_cursor_mcp_config(config_path: Optional[Path] = None) -> Optional[dict """Load and parse `~/.cursor/mcp.json`. Returns None if missing/invalid.""" path = config_path or (Path.home() / '.cursor' / _MCP_CONFIG_FILENAME) if not path.exists(): - logger.debug('Cursor MCP config file not found', extra={'path': str(path)}) + logger.debug('Cursor MCP config file not found, %s', {'path': str(path)}) return None try: return json.loads(path.read_text(encoding='utf-8')) diff --git a/cycode/cli/apps/ai_guardrails/scan/handlers.py b/cycode/cli/apps/ai_guardrails/scan/handlers.py index 4a56a179..8b8a2d71 100644 --- a/cycode/cli/apps/ai_guardrails/scan/handlers.py +++ b/cycode/cli/apps/ai_guardrails/scan/handlers.py @@ -10,6 +10,7 @@ import json import os +from dataclasses import dataclass from multiprocessing.pool import ThreadPool from multiprocessing.pool import TimeoutError as PoolTimeoutError from typing import Callable, Optional @@ -178,23 +179,44 @@ def handle_before_read_file(ctx: typer.Context, payload: AIHookPayload, policy: ) -def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> HookDecision: - """Scan MCP tool arguments for secrets before execution.""" +@dataclass(frozen=True) +class _ArgScanFeature: + """Configuration for a "scan some text and decide" event. + + MCP execution and command exec share identical scan-and-decide logic; + only the policy key, event type, and user-facing messages differ. + """ + + policy_key: str # 'mcp' or 'command_exec' + scan_key: str # 'scan_arguments' or 'scan_command' + event_type: AiHookEventType + block_reason: BlockReason + deny_message: Callable[[str], str] + deny_agent_message: str + ask_message: Callable[[str], str] + ask_agent_message: str + + +def _handle_arg_scan( + ctx: typer.Context, + payload: AIHookPayload, + policy: dict, + feature: _ArgScanFeature, + scan_text: str, +) -> HookDecision: + """Shared scan + decision flow for MCP_EXECUTION and COMMAND_EXEC events.""" ai_client = ctx.obj['ai_security_client'] - mcp_config = get_policy_value(policy, 'mcp', default={}) - if not get_policy_value(mcp_config, 'enabled', default=True): - ai_client.create_event(payload, AiHookEventType.MCP_EXECUTION, AIHookOutcome.ALLOWED) - return HookDecision.allow(AiHookEventType.MCP_EXECUTION) + feature_config = get_policy_value(policy, feature.policy_key, default={}) + if not get_policy_value(feature_config, 'enabled', default=True): + ai_client.create_event(payload, feature.event_type, AIHookOutcome.ALLOWED) + return HookDecision.allow(feature.event_type) mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) - tool = payload.mcp_tool_name or 'unknown' - args = payload.mcp_arguments or {} - args_text = args if isinstance(args, str) else json.dumps(args) max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) - clipped = truncate_utf8(args_text, max_bytes) - action = get_policy_value(mcp_config, 'action', default=PolicyMode.BLOCK) + clipped = truncate_utf8(scan_text, max_bytes) + action = get_policy_value(feature_config, 'action', default=PolicyMode.BLOCK) scan_id = None block_reason = None @@ -202,26 +224,25 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli error_message = None try: - if get_policy_value(mcp_config, 'scan_arguments', default=True): + if get_policy_value(feature_config, feature.scan_key, default=True): violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) if violation_summary: - block_reason = BlockReason.SECRETS_IN_MCP_ARGS + block_reason = feature.block_reason if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: outcome = AIHookOutcome.BLOCKED - user_message = f'Cycode blocked MCP tool call "{tool}". {violation_summary}' return HookDecision.deny( - AiHookEventType.MCP_EXECUTION, - user_message, - 'Do not pass secrets to tools. Use secret references (name/id) instead.', + feature.event_type, + feature.deny_message(violation_summary), + feature.deny_agent_message, ) outcome = AIHookOutcome.WARNED return HookDecision.ask( - AiHookEventType.MCP_EXECUTION, - f'{violation_summary} in MCP tool call "{tool}". Allow execution?', - 'Possible secrets detected in tool arguments; proceed with caution.', + feature.event_type, + feature.ask_message(violation_summary), + feature.ask_agent_message, ) - return HookDecision.allow(AiHookEventType.MCP_EXECUTION) + return HookDecision.allow(feature.event_type) except Exception as e: outcome = ( AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED @@ -232,7 +253,7 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli finally: ai_client.create_event( payload, - AiHookEventType.MCP_EXECUTION, + feature.event_type, outcome, scan_id=scan_id, block_reason=block_reason, @@ -240,6 +261,29 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli ) +def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> HookDecision: + """Scan MCP tool arguments for secrets before execution.""" + tool = payload.mcp_tool_name or 'unknown' + args = payload.mcp_arguments or {} + args_text = args if isinstance(args, str) else json.dumps(args) + return _handle_arg_scan( + ctx, + payload, + policy, + _ArgScanFeature( + policy_key='mcp', + scan_key='scan_arguments', + event_type=AiHookEventType.MCP_EXECUTION, + block_reason=BlockReason.SECRETS_IN_MCP_ARGS, + deny_message=lambda v: f'Cycode blocked MCP tool call "{tool}". {v}', + deny_agent_message='Do not pass secrets to tools. Use secret references (name/id) instead.', + ask_message=lambda v: f'{v} in MCP tool call "{tool}". Allow execution?', + ask_agent_message='Possible secrets detected in tool arguments; proceed with caution.', + ), + scan_text=args_text, + ) + + def get_handler_for_event(event_type: str) -> Optional[HandlerFn]: """Look up the handler for a canonical event type.""" handlers: dict[str, HandlerFn] = { diff --git a/cycode/cli/utils/jwt_utils.py b/cycode/cli/utils/jwt_utils.py index c87b7c48..21f767d0 100644 --- a/cycode/cli/utils/jwt_utils.py +++ b/cycode/cli/utils/jwt_utils.py @@ -5,6 +5,14 @@ _JWT_PAYLOAD_POSSIBLE_USER_ID_FIELD_NAMES = ('userId', 'internalId', 'token-user-id') +def decode_jwt_unverified(token: str) -> Optional[dict]: + """Return JWT claims without signature verification, or None if the token is unreadable.""" + try: + return jwt.decode(token, options={'verify_signature': False}) + except jwt.PyJWTError: + return None + + def get_user_and_tenant_ids_from_access_token(access_token: str) -> tuple[Optional[str], Optional[str]]: payload = jwt.decode(access_token, options={'verify_signature': False}) diff --git a/poetry.lock b/poetry.lock index 1a6ce6ee..eab3b4da 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand. [[package]] name = "altgraph" @@ -339,6 +339,7 @@ files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +markers = {test = "sys_platform == \"win32\""} [[package]] name = "coverage" @@ -742,7 +743,7 @@ files = [ [package.dependencies] attrs = ">=22.2.0" -jsonschema-specifications = ">=2023.03.6" +jsonschema-specifications = ">=2023.3.6" referencing = ">=0.28.4" rpds-py = ">=0.7.1" @@ -1875,7 +1876,7 @@ version = "2.3.0" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["main", "test"] markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:88bd15eb972f3664f5ed4b57c1634a97153b4bac4479dcb6a495f41921eb7f45"}, @@ -1922,6 +1923,18 @@ files = [ {file = "tomli-2.3.0.tar.gz", hash = "sha256:64be704a875d2a59753d80ee8a533c3fe183e3f06807ff7dc2232938ccb01549"}, ] +[[package]] +name = "tomli-w" +version = "1.2.0" +description = "A lil' TOML writer" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"}, + {file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"}, +] + [[package]] name = "typer" version = "0.15.4" @@ -2043,4 +2056,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9" -content-hash = "b67d2f0ceadcf2fbd351b056596da8a04656a3774c2cc26e13cf678b6f31561f" +content-hash = "ac37763cb9b582d1997853c1347edfdb05566fa225adf304685ecce66989fc67" diff --git a/pyproject.toml b/pyproject.toml index c3d10f0a..1b43757b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,8 @@ tenacity = ">=9.0.0,<9.1.0" mcp = { version = ">=1.9.3,<2.0.0", markers = "python_version >= '3.10'" } pydantic = ">=2.11.5,<3.0.0" pathvalidate = ">=3.3.1,<4.0.0" +tomli-w = ">=1.0.0,<2.0.0" +tomli = {version = ">=2.0.0,<3.0.0", python = "<3.11"} [tool.poetry.group.test.dependencies] mock = ">=4.0.3,<4.1.0" diff --git a/tests/cli/commands/ai_guardrails/ides/test_codex.py b/tests/cli/commands/ai_guardrails/ides/test_codex.py new file mode 100644 index 00000000..f71e19a2 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/ides/test_codex.py @@ -0,0 +1,335 @@ +"""Codex CLI IDE integration tests.""" + +import base64 +import json +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest +from pyfakefs.fake_filesystem import FakeFilesystem + +from cycode.cli.apps.ai_guardrails.ides.base import HookDecision +from cycode.cli.apps.ai_guardrails.ides.codex import ( + Codex, + _codex_home, + _email_from_auth, + _enable_codex_hooks_feature, + _load_codex_config, +) +from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType + +if sys.version_info >= (3, 11): + import tomllib +else: # pragma: no cover - py<3.11 fallback + import tomli as tomllib + + +# --- payload parsing --------------------------------------------------------- + + +def test_matches_payload_only_codex_events() -> None: + codex = Codex() + assert codex.matches_payload({'hook_event_name': 'UserPromptSubmit'}) is True + assert codex.matches_payload({'hook_event_name': 'PreToolUse'}) is True + assert codex.matches_payload({'hook_event_name': 'beforeSubmitPrompt'}) is False + assert codex.matches_payload({'hook_event_name': 'SessionStart'}) is False + + +def test_parse_prompt_payload() -> None: + unified = Codex().parse_hook_payload( + { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-123', + 'turn_id': 'turn-456', + 'model': 'gpt-5-codex', + 'prompt': 'Test prompt', + } + ) + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id == 'session-123' + assert unified.generation_id == 'turn-456' + assert unified.model == 'gpt-5-codex' + assert unified.ide_provider == 'codex' + assert unified.prompt == 'Test prompt' + + +def test_parse_mcp_execution_payload() -> None: + args = {'resource_type': 'merge_request', 'resource_id': '4'} + unified = Codex().parse_hook_payload( + { + 'hook_event_name': 'PreToolUse', + 'tool_name': 'mcp__gitlab__discussion_list', + 'tool_input': args, + } + ) + assert unified.event_name == AiHookEventType.MCP_EXECUTION + assert unified.mcp_server_name == 'gitlab' + assert unified.mcp_tool_name == 'discussion_list' + assert unified.mcp_arguments == args + + +def test_parse_unknown_event_falls_through() -> None: + unified = Codex().parse_hook_payload({'hook_event_name': 'Stop'}) + assert unified.event_name == 'Stop' + + +def test_parse_empty_payload_defaults() -> None: + unified = Codex().parse_hook_payload({'hook_event_name': 'UserPromptSubmit'}) + assert unified.event_name == AiHookEventType.PROMPT + assert unified.prompt == '' + assert unified.ide_provider == 'codex' + + +# --- response building ------------------------------------------------------- + + +def test_build_prompt_responses() -> None: + codex = Codex() + assert codex.build_hook_response(HookDecision.allow(AiHookEventType.PROMPT)) == {} + assert codex.build_hook_response(HookDecision.deny(AiHookEventType.PROMPT, 'no!')) == { + 'decision': 'block', + 'reason': 'no!', + } + + +def test_build_mcp_execution_allow_and_deny() -> None: + codex = Codex() + allow = codex.build_hook_response(HookDecision.allow(AiHookEventType.MCP_EXECUTION)) + assert allow == {'hookSpecificOutput': {'hookEventName': 'PreToolUse', 'permissionDecision': 'allow'}} + + deny = codex.build_hook_response(HookDecision.deny(AiHookEventType.MCP_EXECUTION, 'secret in args!')) + assert deny == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'deny', + 'permissionDecisionReason': 'secret in args!', + } + } + + +def test_build_mcp_execution_ask() -> None: + ask = Codex().build_hook_response(HookDecision.ask(AiHookEventType.MCP_EXECUTION, 'maybe?')) + assert ask == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'ask', + 'permissionDecisionReason': 'maybe?', + } + } + + +# --- settings paths ---------------------------------------------------------- + + +def test_settings_path_user_scope() -> None: + path = Codex().settings_path('user') + assert path.name == 'hooks.json' + assert path.parent.name == '.codex' + + +def test_settings_path_repo_scope(fs: FakeFilesystem) -> None: + repo = Path('/my-repo') + fs.create_dir(repo) + path = Codex().settings_path('repo', repo) + assert path == repo / '.codex' / 'hooks.json' + + +def test_settings_path_honors_codex_home_env(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + override = '/custom/codex/home' + fs.create_dir(override) + monkeypatch.setenv('CODEX_HOME', override) + assert _codex_home() == Path(override) + assert Codex().settings_path('user') == Path(override) / 'hooks.json' + + +# --- hooks config rendering -------------------------------------------------- + + +def test_render_hooks_session_start_matcher_includes_clear() -> None: + """SessionStart must fire on /clear too (conversation_id rotates).""" + rendered = Codex().render_hooks_config() + assert rendered['hooks']['SessionStart'][0]['matcher'] == 'startup|clear' + assert '--ide codex' in rendered['hooks']['SessionStart'][0]['hooks'][0]['command'] + + +def test_render_hooks_never_emits_async_toml_flags() -> None: + """Codex's TOML `async: true` / `timeout` flags are unimplemented; we must not emit them.""" + for mode in (False, True): + rendered = Codex().render_hooks_config(async_mode=mode) + for entry in rendered['hooks']['PreToolUse']: + for hook in entry['hooks']: + assert 'async' not in hook + assert 'timeout' not in hook + + +def test_render_hooks_async_backgrounds_scan_hooks() -> None: + """In async mode, UserPromptSubmit + PreToolUse scan hooks shell-background.""" + rendered = Codex().render_hooks_config(async_mode=True) + prompt_cmd = rendered['hooks']['UserPromptSubmit'][0]['hooks'][0]['command'] + pretool_cmd = rendered['hooks']['PreToolUse'][0]['hooks'][0]['command'] + assert prompt_cmd.endswith(' &') + assert pretool_cmd.endswith(' &') + + +def test_render_hooks_session_start_always_synchronous() -> None: + """SessionStart registers the conversation context — never backgrounded.""" + for mode in (False, True): + rendered = Codex().render_hooks_config(async_mode=mode) + session_cmd = rendered['hooks']['SessionStart'][0]['hooks'][0]['command'] + assert '&' not in session_cmd + + +def test_render_hooks_pretooluse_matchers_are_mcp_only() -> None: + matchers = [e['matcher'] for e in Codex().render_hooks_config()['hooks']['PreToolUse']] + assert matchers == ['mcp__.*'] + + +# --- post_install: TOML feature flag ---------------------------------------- + + +def test_post_install_creates_config_toml(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + + success, message = Codex().post_install('user') + assert success is True + config_path = Path(home) / 'config.toml' + assert config_path.exists() + assert 'config.toml' in message + + with config_path.open('rb') as f: + config = tomllib.load(f) + assert config['features']['hooks'] is True + + +def test_post_install_preserves_existing_keys(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + config_path = Path(home) / 'config.toml' + # Pre-existing settings the user cares about + config_path.write_text('model = "gpt-5-codex"\n\n[features]\nother = true\n') + + success, _ = Codex().post_install('user') + assert success is True + + with config_path.open('rb') as f: + config = tomllib.load(f) + assert config['model'] == 'gpt-5-codex' + assert config['features']['other'] is True + assert config['features']['hooks'] is True + + +def test_post_install_repo_scope_writes_to_repo_dir(fs: FakeFilesystem) -> None: + repo = Path('/my-repo') + fs.create_dir(repo) + success, _ = Codex().post_install('repo', repo) + assert success is True + assert (repo / '.codex' / 'config.toml').exists() + + +def test_enable_codex_hooks_feature_fails_on_corrupt_toml(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + (Path(home) / 'config.toml').write_text('this is = not [ valid] toml = ') + + success, message = _enable_codex_hooks_feature('user') + assert success is False + assert 'Failed to parse' in message + + +# --- TOML config loading ----------------------------------------------------- + + +def test_load_codex_config_valid(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + (Path(home) / 'config.toml').write_text('model = "gpt-5-codex"\n[mcp_servers.linear]\ncommand = "linear-mcp"\n') + + config = _load_codex_config() + assert config is not None + assert config['model'] == 'gpt-5-codex' + assert config['mcp_servers']['linear']['command'] == 'linear-mcp' + + +def test_load_codex_config_missing_file(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + assert _load_codex_config() is None + + +def test_load_codex_config_invalid_toml(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + (Path(home) / 'config.toml').write_text('this is = not [ valid] toml = ') + assert _load_codex_config() is None + + +# --- JWT email extraction ---------------------------------------------------- + + +def _make_jwt(claims: dict) -> str: + """Build a JWT-shaped token with the given claims (signature ignored).""" + header = base64.urlsafe_b64encode(b'{"alg":"RS256"}').rstrip(b'=').decode() + payload = base64.urlsafe_b64encode(json.dumps(claims).encode()).rstrip(b'=').decode() + return f'{header}.{payload}.signature-not-verified' + + +def test_email_from_auth_returns_email(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + token = _make_jwt({'email': 'codex-user@example.com'}) + (Path(home) / 'auth.json').write_text(json.dumps({'tokens': {'id_token': token}})) + + assert _email_from_auth() == 'codex-user@example.com' + + +def test_email_from_auth_missing_file(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + assert _email_from_auth() is None + + +def test_email_from_auth_no_id_token(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + (Path(home) / 'auth.json').write_text(json.dumps({'tokens': {}})) + assert _email_from_auth() is None + + +def test_email_from_auth_malformed_token(fs: FakeFilesystem, monkeypatch: pytest.MonkeyPatch) -> None: + home = '/codex-home' + fs.create_dir(home) + monkeypatch.setenv('CODEX_HOME', home) + (Path(home) / 'auth.json').write_text(json.dumps({'tokens': {'id_token': 'not.a.jwt-with-bad-payload!!'}})) + assert _email_from_auth() is None + + +# --- session context -------------------------------------------------------- + + +def test_session_context_reads_mcp_servers() -> None: + mcp = {'linear': {'command': 'linear-mcp'}, 'github': {'command': 'gh-mcp'}} + with patch( + 'cycode.cli.apps.ai_guardrails.ides.codex._load_codex_config', + return_value={'mcp_servers': mcp}, + ): + servers, plugins = Codex().get_session_context() + assert servers == mcp + assert plugins == {} + + +def test_session_context_no_config() -> None: + with patch('cycode.cli.apps.ai_guardrails.ides.codex._load_codex_config', return_value=None): + servers, plugins = Codex().get_session_context() + assert servers == {} + assert plugins == {} diff --git a/tests/cli/commands/ai_guardrails/ides/test_contract.py b/tests/cli/commands/ai_guardrails/ides/test_contract.py index 9714dbfa..7d7a5427 100644 --- a/tests/cli/commands/ai_guardrails/ides/test_contract.py +++ b/tests/cli/commands/ai_guardrails/ides/test_contract.py @@ -61,7 +61,7 @@ def test_render_hooks_config_has_hooks_key(ide: IDE) -> None: def test_render_hooks_config_async_changes_output(ide: IDE) -> None: - """async_mode must influence the rendered output (e.g. & suffix, async flag).""" + """async_mode must influence the rendered output.""" assert ide.render_hooks_config(async_mode=False) != ide.render_hooks_config(async_mode=True) diff --git a/tests/cli/commands/ai_guardrails/test_hooks_manager.py b/tests/cli/commands/ai_guardrails/test_hooks_manager.py index 4ff8d575..f0a0248c 100644 --- a/tests/cli/commands/ai_guardrails/test_hooks_manager.py +++ b/tests/cli/commands/ai_guardrails/test_hooks_manager.py @@ -1,17 +1,27 @@ """Tests for AI guardrails hooks manager and per-IDE hooks rendering.""" from pathlib import Path +from typing import TYPE_CHECKING import yaml from pyfakefs.fake_filesystem import FakeFilesystem +if TYPE_CHECKING: + import pytest + from cycode.cli.apps.ai_guardrails.consts import ( CYCODE_SCAN_PROMPT_COMMAND, CYCODE_SESSION_START_COMMAND, PolicyMode, ) -from cycode.cli.apps.ai_guardrails.hooks_manager import create_policy_file, is_cycode_hook_entry +from cycode.cli.apps.ai_guardrails.hooks_manager import ( + create_policy_file, + install_hooks, + is_cycode_hook_entry, + uninstall_hooks, +) from cycode.cli.apps.ai_guardrails.ides.claude_code import ClaudeCode +from cycode.cli.apps.ai_guardrails.ides.codex import Codex from cycode.cli.apps.ai_guardrails.ides.cursor import Cursor @@ -101,11 +111,11 @@ def test_claude_code_render_hooks_async() -> None: def test_claude_code_render_hooks_session_start() -> None: - """Claude Code SessionStart carries the --ide flag explicitly.""" + """Claude Code SessionStart fires on startup and /clear.""" config = ClaudeCode().render_hooks_config() - assert 'SessionStart' in config['hooks'] entries = config['hooks']['SessionStart'] assert len(entries) == 1 + assert entries[0]['matcher'] == 'startup|clear' assert CYCODE_SESSION_START_COMMAND in entries[0]['hooks'][0]['command'] assert '--ide claude-code' in entries[0]['hooks'][0]['command'] @@ -153,6 +163,101 @@ def test_create_policy_file_updates_existing(fs: FakeFilesystem) -> None: assert policy['custom_field'] == 'keep_me' +def test_install_preserves_user_hook_colocated_with_cycode( + fs: FakeFilesystem, monkeypatch: 'pytest.MonkeyPatch' +) -> None: + """install must not clobber a user-authored hook that shares + an entry with a Cycode hook. The filter is hook-level, not entry-level. + """ + import json + + repo = Path('/repo') + fs.create_dir(repo) + hooks_path = repo / '.codex' / 'hooks.json' + fs.create_file( + hooks_path, + contents=json.dumps( + { + 'version': 1, + 'hooks': { + 'SessionStart': [ + { + 'matcher': 'startup|clear', + 'hooks': [ + {'type': 'command', 'command': '/usr/local/bin/user-debug.sh SessionStart'}, + {'type': 'command', 'command': 'cycode ai-guardrails session-start --ide codex'}, + ], + } + ], + # Unrelated event with no Cycode hooks at all — must be untouched. + 'PostToolUse': [{'hooks': [{'type': 'command', 'command': '/usr/local/bin/user-postlog.sh'}]}], + }, + } + ), + ) + + # Codex's post_install touches ~/.codex/config.toml (user scope) — keep that off + # the filesystem under test by pinning CODEX_HOME inside the fake FS. + monkeypatch.setenv('CODEX_HOME', '/codex-home') + fs.create_dir('/codex-home') + + success, _ = install_hooks(Codex(), scope='repo', repo_path=repo) + assert success is True + + saved = json.loads(hooks_path.read_text()) + session_start = saved['hooks']['SessionStart'] + # The pre-existing entry should still exist with the user hook preserved, + # and a separate fresh Cycode entry should have been appended. + user_hook_cmd = '/usr/local/bin/user-debug.sh SessionStart' + remaining_user_hooks = [ + h for entry in session_start for h in entry.get('hooks', []) if h.get('command') == user_hook_cmd + ] + assert remaining_user_hooks, 'user hook was clobbered by install' + + # Unrelated event untouched. + assert saved['hooks']['PostToolUse'][0]['hooks'][0]['command'] == '/usr/local/bin/user-postlog.sh' + + +def test_uninstall_preserves_user_hook_colocated_with_cycode( + fs: FakeFilesystem, monkeypatch: 'pytest.MonkeyPatch' +) -> None: + """uninstall must strip only the Cycode hook from a mixed entry.""" + import json + + repo = Path('/repo') + fs.create_dir(repo) + hooks_path = repo / '.codex' / 'hooks.json' + fs.create_file( + hooks_path, + contents=json.dumps( + { + 'version': 1, + 'hooks': { + 'UserPromptSubmit': [ + { + 'hooks': [ + {'type': 'command', 'command': '/usr/local/bin/user-debug.sh UserPromptSubmit'}, + {'type': 'command', 'command': 'cycode ai-guardrails scan --ide codex'}, + ] + } + ] + }, + } + ), + ) + monkeypatch.setenv('CODEX_HOME', '/codex-home') + fs.create_dir('/codex-home') + + success, _ = uninstall_hooks(Codex(), scope='repo', repo_path=repo) + assert success is True + + saved = json.loads(hooks_path.read_text()) + hooks = saved['hooks']['UserPromptSubmit'][0]['hooks'] + commands = [h['command'] for h in hooks] + assert '/usr/local/bin/user-debug.sh UserPromptSubmit' in commands + assert not any('cycode ai-guardrails' in c for c in commands) + + def test_create_policy_file_repo_scope(fs: FakeFilesystem) -> None: """Create a policy file in repo scope.""" repo_path = Path('/my-repo')