Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 83 additions & 25 deletions cycode/cli/apps/ai_guardrails/hooks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def _is_cycode_command(command: str) -> bool:


def is_cycode_hook_entry(entry: dict) -> bool:
"""Detect Cycode hook entries in both Cursor (flat) and Claude Code (nested) shapes."""
"""True if any hook inside ``entry`` is owned by Cycode."""
command = entry.get('command', '')
if _is_cycode_command(command):
return True
Expand All @@ -40,6 +40,31 @@ def is_cycode_hook_entry(entry: dict) -> bool:
return False


def _strip_cycode_from_entry(entry: dict) -> Optional[dict]:
"""Remove Cycode hooks from ``entry`` and return the remainder.

Returns ``None`` when nothing useful remains (Cursor-flat Cycode entry, or
every nested hook was Cycode). Non-Cycode hooks co-located in the same
entry are preserved.
"""
# Cursor format: the entry itself IS a single hook command.
if 'command' in entry and 'hooks' not in entry:
return None if _is_cycode_command(entry.get('command', '')) else entry

# Claude Code / Codex format: nested `hooks` list inside the entry.
nested = entry.get('hooks')
if isinstance(nested, list):
kept = [h for h in nested if not (isinstance(h, dict) and _is_cycode_command(h.get('command', '')))]
if not kept:
return None
if len(kept) == len(nested):
return entry # nothing Cycode-shaped inside; preserve identity
return {**entry, 'hooks': kept}

# Entry has neither shape we recognize — leave it alone defensively.
return entry


def _load_hooks_file(hooks_path: Path) -> Optional[dict]:
if not hooks_path.exists():
return None
Expand Down Expand Up @@ -108,50 +133,83 @@ def install_hooks(

for event, entries in rendered['hooks'].items():
existing['hooks'].setdefault(event, [])

# Remove any existing Cycode entries for this event
existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)]

# Add new Cycode entries
existing['hooks'][event] = [
stripped for e in existing['hooks'][event] if (stripped := _strip_cycode_from_entry(e)) is not None
]
for entry in entries:
existing['hooks'][event].append(entry)

if _save_hooks_file(hooks_path, existing):
return True, f'AI guardrails hooks installed: {hooks_path}'
return False, f'Failed to install hooks to {hooks_path}'
if not _save_hooks_file(hooks_path, existing):
return False, f'Failed to install hooks to {hooks_path}'

message = f'AI guardrails hooks installed: {hooks_path}'

def uninstall_hooks(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]:
"""Remove Cycode AI guardrails hooks for ``ide``."""
hooks_path = ide.settings_path(scope, repo_path)
# IDE-specific extras (e.g. Codex enables a TOML feature flag).
extra_ok, extra_message = ide.post_install(scope, repo_path)
if not extra_ok:
return False, extra_message
if extra_message:
message = f'{message}\n {extra_message}'

existing = _load_hooks_file(hooks_path)
if existing is None:
return True, f'No hooks file found at {hooks_path}'
return True, message


def _strip_cycode_entries(existing: dict) -> bool:
"""Mutate ``existing`` to drop Cycode hooks (surgically). Return True if anything changed."""
modified = False
for event in list(existing.get('hooks', {}).keys()):
original_count = len(existing['hooks'][event])
existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)]
if len(existing['hooks'][event]) != original_count:
modified = True
if not existing['hooks'][event]:
before = existing['hooks'][event]
after: list = []
for e in before:
stripped = _strip_cycode_from_entry(e)
if stripped is None:
modified = True
continue
if stripped is not e:
modified = True
after.append(stripped)
if not after:
del existing['hooks'][event]
else:
existing['hooks'][event] = after
return modified


def _persist_uninstall(hooks_path: Path, existing: dict, modified: bool) -> tuple[bool, str]:
"""Apply the uninstall result to disk and return ``(success, message)``."""
if not modified:
return True, 'No Cycode hooks found to remove'

if not existing.get('hooks'):
try:
hooks_path.unlink()
return True, f'Removed hooks file: {hooks_path}'
except Exception as e:
logger.debug('Failed to delete hooks file', exc_info=e)
return False, f'Failed to remove hooks file: {hooks_path}'
return True, f'Removed hooks file: {hooks_path}'
if not _save_hooks_file(hooks_path, existing):
return False, f'Failed to update hooks file: {hooks_path}'
return True, f'Cycode hooks removed from: {hooks_path}'


def uninstall_hooks(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]:
"""Remove Cycode AI guardrails hooks for ``ide``."""
hooks_path = ide.settings_path(scope, repo_path)

existing = _load_hooks_file(hooks_path)
if existing is None:
return True, f'No hooks file found at {hooks_path}'

if _save_hooks_file(hooks_path, existing):
return True, f'Cycode hooks removed from: {hooks_path}'
return False, f'Failed to update hooks file: {hooks_path}'
modified = _strip_cycode_entries(existing)
file_ok, message = _persist_uninstall(hooks_path, existing, modified)
if not file_ok:
return False, message

extra_ok, extra_message = ide.post_uninstall(scope, repo_path)
if not extra_ok:
return False, extra_message
if extra_message:
message = f'{message}\n {extra_message}'
return True, message


def get_hooks_status(ide: IDE, scope: str = 'user', repo_path: Optional[Path] = None) -> dict:
Expand Down
3 changes: 2 additions & 1 deletion cycode/cli/apps/ai_guardrails/ides/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

from cycode.cli.apps.ai_guardrails.ides.base import IDE
from cycode.cli.apps.ai_guardrails.ides.claude_code import ClaudeCode
from cycode.cli.apps.ai_guardrails.ides.codex import Codex
from cycode.cli.apps.ai_guardrails.ides.cursor import Cursor

# Single source of truth: name → singleton instance.
# `--ide` choices and install/uninstall/status iteration both derive from this.
IDES: dict[str, IDE] = {ide.name: ide for ide in (Cursor(), ClaudeCode())}
IDES: dict[str, IDE] = {ide.name: ide for ide in (Cursor(), ClaudeCode(), Codex())}

# Default IDE used when `--ide` is omitted. Kept here so the value is colocated
# with the registry; no module outside `ides/` needs to know which IDE wins.
Expand Down
73 changes: 73 additions & 0 deletions cycode/cli/apps/ai_guardrails/ides/_plugin_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Shared plugin-resolution helpers for IDE integrations.

Both Claude Code and Codex use the same ``<plugin>@<marketplace>`` key convention
and emit the same telemetry shape — only the marketplace layout and manifest
location differ. ``walk_enabled_plugins`` is the IDE-agnostic loop; each IDE
supplies the two callables that vary (``locate_dir`` + ``read_plugin``).
"""

import json
from pathlib import Path
from typing import Any, Callable, Optional

from cycode.logger import get_logger

logger = get_logger('AI Guardrails Plugins')


def load_plugin_json(path: Path) -> Optional[dict]:
"""Load a JSON file inside a plugin directory; None if missing or invalid."""
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding='utf-8'))
except Exception as e:
logger.debug('Failed to load plugin file, %s', {'path': str(path)}, exc_info=e)
return None


def walk_enabled_plugins(
plugin_entries: dict[str, Any],
is_enabled: Callable[[Any], bool],
locate_dir: Callable[[str, str], Optional[Path]],
read_plugin: Callable[[Path], tuple[dict, dict]],
) -> tuple[dict, dict]:
"""Iterate enabled plugins; merge their MCP servers and metadata.

Args:
plugin_entries: ``{<plugin>@<marketplace>: settings}`` map from the IDE config.
is_enabled: returns True if ``settings`` indicates the plugin is on
(e.g. ``bool(settings)`` for Claude, ``settings.get('enabled')`` for Codex).
Comment thread
RoniCycode marked this conversation as resolved.
locate_dir: given ``(plugin_name, marketplace)``, returns the plugin's
filesystem path or None if it can't be resolved.
read_plugin: given the plugin path, returns ``(entry_fields, servers)``:
``entry_fields`` are extra metadata to attach to the inventory entry
(name/version/description/...), ``servers`` are MCP servers contributed.

Returns ``(merged_mcp_servers, enriched_plugins)``. Plugin keys without
``@`` (or that fail to resolve to a directory) still appear in the
inventory with just ``{'enabled': True}`` so we don't silently drop them.
"""
merged_mcp: dict = {}
enriched: dict = {}

for plugin_key, settings in plugin_entries.items():
if not is_enabled(settings):
continue

entry: dict = {'enabled': True}
enriched[plugin_key] = entry

if '@' not in plugin_key:
continue
plugin_name, marketplace = plugin_key.split('@', 1)

plugin_dir = locate_dir(plugin_name, marketplace)
if plugin_dir is None:
Comment thread
Ilanlido marked this conversation as resolved.
continue

plugin_fields, servers = read_plugin(plugin_dir)
entry.update(plugin_fields)
merged_mcp.update(servers)

return merged_mcp, enriched
20 changes: 20 additions & 0 deletions cycode/cli/apps/ai_guardrails/ides/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ def render_hooks_config(self, async_mode: bool = False) -> dict:
``hooks_manager`` can treat them uniformly.
"""

def post_install(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]:
"""Run IDE-specific actions after the hooks file is written.

Default: no-op success. Override to perform extra setup that doesn't
belong in the hooks file itself — e.g. Codex enables a
``[features] codex_hooks = true`` flag in its TOML config.

Returns ``(success, message)``. If ``success`` is False, the overall
install is considered failed.
"""
return True, ''

def post_uninstall(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]:
"""Run IDE-specific cleanup after the hooks file is removed.

Default: no-op success. Override to undo whatever ``post_install``
wrote outside the hooks file.
"""
return True, ''

# --- runtime scan ---

@abstractmethod
Expand Down
84 changes: 32 additions & 52 deletions cycode/cli/apps/ai_guardrails/ides/claude_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import ClassVar, Optional

Comment thread
Ilanlido marked this conversation as resolved.
from cycode.cli.apps.ai_guardrails.consts import CYCODE_SCAN_PROMPT_COMMAND, CYCODE_SESSION_START_COMMAND
from cycode.cli.apps.ai_guardrails.ides._plugin_utils import load_plugin_json, walk_enabled_plugins
from cycode.cli.apps.ai_guardrails.ides.base import IDE, DecisionAction, HookDecision
from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload
from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType
Expand Down Expand Up @@ -127,7 +128,7 @@ def load_claude_config(config_path: Optional[Path] = None) -> Optional[dict]:
"""Load and parse `~/.claude.json`. Returns None if missing/invalid."""
path = config_path or _CLAUDE_CONFIG_PATH
if not path.exists():
logger.debug('Claude config file not found', extra={'path': str(path)})
logger.debug('Claude config file not found, %s', {'path': str(path)})
return None
try:
return json.loads(path.read_text(encoding='utf-8'))
Expand All @@ -150,7 +151,7 @@ def load_claude_settings(settings_path: Optional[Path] = None) -> Optional[dict]
"""Load and parse `~/.claude/settings.json`. Returns None if missing/invalid."""
path = settings_path or _CLAUDE_SETTINGS_PATH
if not path.exists():
logger.debug('Claude settings file not found', extra={'path': str(path)})
logger.debug('Claude settings file not found, %s', {'path': str(path)})
return None
try:
return json.loads(path.read_text(encoding='utf-8'))
Expand All @@ -171,69 +172,47 @@ def _resolve_marketplace_path(marketplace: dict) -> Optional[Path]:
return path if path.is_dir() else None


def _load_plugin_json_file(plugin_path: Path, relative_path: str) -> Optional[dict]:
"""Load and parse a JSON file inside a plugin directory.
def _read_claude_plugin(plugin_dir: Path) -> tuple[dict, dict]:
"""Read one Claude Code plugin's manifest + MCP servers.

Returns None if the file is missing, unreadable, or has invalid JSON.
Claude hardcodes the MCP file at ``<plugin_dir>/.mcp.json`` and always
wraps it as ``{"mcpServers": {...}}``.
"""
target = plugin_path / relative_path
if not target.exists():
return None
try:
return json.loads(target.read_text(encoding='utf-8'))
except Exception as e:
logger.debug('Failed to load plugin file', extra={'path': str(target)}, exc_info=e)
return None
manifest = load_plugin_json(plugin_dir / '.claude-plugin' / 'plugin.json') or {}
entry: dict = {}
for field in ('name', 'version', 'description'):
if field in manifest:
entry[field] = manifest[field]

mcp_config = load_plugin_json(plugin_dir / '.mcp.json') or {}
servers: dict = mcp_config.get('mcpServers') or {}
if servers:
entry['mcp_server_names'] = list(servers.keys())
return entry, servers

def resolve_plugins(settings: dict) -> tuple[dict, dict]:
"""Resolve enabled plugins to their MCP servers and metadata.

Walks ``enabledPlugins`` from claude settings, resolves each plugin's
marketplace directory via ``extraKnownMarketplaces``, and reads:
- ``<path>/.mcp.json`` for MCP servers (merged into a flat dict)
- ``<path>/.claude-plugin/plugin.json`` for metadata (name, version, description)
def resolve_plugins(settings: dict) -> tuple[dict, dict]:
"""Walk Claude Code's ``enabledPlugins`` via the shared plugin walker.

Returns ``(merged_mcp_servers, enriched_plugins)``.
Each enabled plugin's marketplace is resolved through
``extraKnownMarketplaces`` to a directory; the rest of the work
(manifest + ``.mcp.json``) is the shared ``_read_claude_plugin``.
"""
enabled = settings.get('enabledPlugins') or {}
marketplaces = settings.get('extraKnownMarketplaces') or {}
merged_mcp: dict = {}
enriched: dict = {}

for plugin_key, is_enabled in enabled.items():
if not is_enabled:
continue

entry: dict = {'enabled': True}
enriched[plugin_key] = entry

if '@' not in plugin_key:
continue

_plugin_name, marketplace_name = plugin_key.split('@', 1)
def _locate(_plugin_name: str, marketplace_name: str) -> Optional[Path]:
marketplace = marketplaces.get(marketplace_name)
if not marketplace:
continue

plugin_path = _resolve_marketplace_path(marketplace)
if plugin_path is None:
continue

metadata = _load_plugin_json_file(plugin_path, '.claude-plugin/plugin.json') or {}
for field in ('name', 'version', 'description'):
if field in metadata:
entry[field] = metadata[field]

mcp_config = _load_plugin_json_file(plugin_path, '.mcp.json') or {}
plugin_server_names = []
for server_name, server_cfg in (mcp_config.get('mcpServers') or {}).items():
merged_mcp[server_name] = server_cfg
plugin_server_names.append(server_name)
if plugin_server_names:
entry['mcp_server_names'] = plugin_server_names
return None
return _resolve_marketplace_path(marketplace)

return merged_mcp, enriched
return walk_enabled_plugins(
plugin_entries=enabled,
is_enabled=bool,
locate_dir=_locate,
read_plugin=_read_claude_plugin,
)


# --- IDE integration ----------------------------------------------------------
Expand All @@ -260,6 +239,7 @@ def render_hooks_config(self, async_mode: bool = False) -> dict:
'hooks': {
'SessionStart': [
{
'matcher': 'startup|clear',
'hooks': [{'type': 'command', 'command': _SESSION_START_COMMAND}],
}
],
Expand Down
Loading
Loading