diff --git a/cyberai/__main__.py b/cyberai/__main__.py index b38c676..dc70de8 100644 --- a/cyberai/__main__.py +++ b/cyberai/__main__.py @@ -83,6 +83,51 @@ def replay(session_id: str) -> None: raise SystemExit(run_replay(session_id, config)) +@cli.group() +def scope() -> None: + """Import and inspect bug-bounty program scopes.""" + + +@scope.command("import") +@click.argument("platform", type=click.Choice(["h1", "hackerone", "bugcrowd", "bc"])) +@click.argument("scope_file", type=click.Path(exists=True)) +def scope_import(platform: str, scope_file: str) -> None: + """Import authorized scope from a PLATFORM SCOPE_FILE (JSON export). + + Examples: + cyberai scope import h1 acme_scope.json + cyberai scope import bugcrowd acme_bc.json + """ + from cyberai.cli.scope import import_bugcrowd_scope, import_h1_scope + + if platform in ("bugcrowd", "bc"): + result = import_bugcrowd_scope(scope_file) + else: + result = import_h1_scope(scope_file) + console.print( + Panel( + "\n".join(result.in_scope) or "[dim]none[/dim]", + title=f"In scope ({len(result.in_scope)})", + style="green", + ) + ) + if result.out_of_scope: + console.print( + Panel( + "\n".join(result.out_of_scope), + title=f"Out of scope ({len(result.out_of_scope)})", + style="red", + ) + ) + console.print(f"[dim]{result.summary()}[/dim]") + console.print( + "[dim]Use with: cyberai scan " + + " ".join(f"--scope {s}" for s in result.in_scope[:3]) + + (" ..." if len(result.in_scope) > 3 else "") + + "[/dim]" + ) + + @cli.command() def status() -> None: """Show CyberAI status and config.""" diff --git a/cyberai/agents/exploit/safety_validator.py b/cyberai/agents/exploit/safety_validator.py index 3620fa8..5e24a9e 100644 --- a/cyberai/agents/exploit/safety_validator.py +++ b/cyberai/agents/exploit/safety_validator.py @@ -120,23 +120,58 @@ def _check_target_ip(target: str) -> List[str]: return violations -def _target_in_scope(target: str, scope: List[str]) -> bool: - """Check if target matches any entry in authorized scope list.""" - for entry in scope: - # Exact hostname match - if target.lower() == entry.lower(): +def _matches_entry(target: str, entry: str) -> bool: + """True if `target` matches a single scope entry (exact / wildcard / CIDR). + + Exclusion markers (leading '!') are NOT handled here — strip them before + calling. Wildcard `*.example.com` matches any subdomain but not the apex. + """ + entry = entry.strip() + # Exact hostname match + if target.lower() == entry.lower(): + return True + # Wildcard hostname: *.example.com + if entry.startswith("*."): + domain = entry[2:].lower() + if target.lower().endswith(f".{domain}"): return True - # Wildcard hostname: *.example.com - if entry.startswith("*."): - domain = entry[2:] - if target.lower().endswith(f".{domain}"): - return True - # CIDR match - try: - ip = ipaddress.ip_address(target) - network = ipaddress.ip_network(entry, strict=False) - if ip in network: - return True - except ValueError: - continue + # CIDR / IP match + try: + ip = ipaddress.ip_address(target) + network = ipaddress.ip_network(entry, strict=False) + if ip in network: + return True + except ValueError: + pass return False + + +def _split_scope(scope: List[str]) -> tuple[List[str], List[str]]: + """Partition a scope list into (allow, exclude). + + Entries starting with '!' are exclusions (out-of-scope), e.g. + `!staging.acme.com` or `!10.0.5.0/24`. The marker is stripped. + """ + allow: List[str] = [] + exclude: List[str] = [] + for entry in scope: + e = entry.strip() + if e.startswith("!"): + exclude.append(e[1:].strip()) + else: + allow.append(e) + return allow, exclude + + +def _target_in_scope(target: str, scope: List[str]) -> bool: + """Check if target is authorized: matches an allow entry AND no exclusion. + + Exclusions (`!host`) take precedence — a target inside `*.acme.com` but + also matching `!staging.acme.com` is OUT of scope. This mirrors real + bug-bounty briefs where a wildcard is in-scope minus specific subdomains. + """ + allow, exclude = _split_scope(scope) + # Exclusions win — checked first. + if any(_matches_entry(target, ex) for ex in exclude): + return False + return any(_matches_entry(target, a) for a in allow) diff --git a/cyberai/cli/scope.py b/cyberai/cli/scope.py index 7ba2e2b..4d6b372 100644 --- a/cyberai/cli/scope.py +++ b/cyberai/cli/scope.py @@ -4,10 +4,38 @@ """ import ipaddress +import json import re -from typing import List +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, List +from urllib.parse import urlparse + from cyberai.core.safety import ScopeConfig +# Asset types from HackerOne/Bugcrowd that map to network-scannable targets. +# Non-network types (mobile app IDs, source repos, hardware, "other") are +# skipped — the pipeline can't scan an App Store ID. +SCANNABLE_ASSET_TYPES = { + "URL", + "WILDCARD", + "CIDR", + "IP_ADDRESS", + "DOMAIN", + "API", + "WEBSITE", +} + +# Bugcrowd target categories that map to network-scannable targets. +SCANNABLE_BC_CATEGORIES = { + "website", + "api", + "url", + "ip", + "cidr", + "wildcard", +} + def parse_scope(scope_str: str) -> ScopeConfig: """ @@ -65,3 +93,141 @@ def format_scope(scope: ScopeConfig) -> str: if not parts: return "no scope defined" return ", ".join(parts) + + +@dataclass +class ScopeImport: + """Result of importing a bug-bounty program scope file.""" + + in_scope: List[str] = field(default_factory=list) + out_of_scope: List[str] = field(default_factory=list) + skipped: List[str] = field(default_factory=list) + + def summary(self) -> str: + return ( + f"{len(self.in_scope)} in-scope, " + f"{len(self.out_of_scope)} out-of-scope, " + f"{len(self.skipped)} skipped (non-network)" + ) + + +def _normalize_asset(identifier: str, asset_type: str) -> str: + """Reduce an asset identifier to a host/wildcard/CIDR token. + + URL assets become bare hosts (`https://api.x.com/v1` -> `api.x.com`). + WILDCARD/CIDR/IP pass through unchanged. Ports and paths are stripped. + """ + ident = identifier.strip() + atype = asset_type.upper() + if atype in {"WILDCARD", "CIDR", "IP_ADDRESS"}: + return ident + # URL/DOMAIN/API/WEBSITE -> strip scheme, path, port. + if "://" in ident: + ident = urlparse(ident).netloc or urlparse(ident).path + ident = ident.split("/")[0].split(":")[0] + return ident.strip().lower() + + +def import_h1_scope(path: str) -> ScopeImport: + """Parse a HackerOne structured-scopes JSON export into a ScopeImport. + + Accepts either the raw JSON:API envelope ({"data": [...]}) or a bare + list of structured-scope objects. Each item carries an `attributes` + block with `asset_identifier`, `asset_type`, `eligible_for_submission`. + Only eligible, network-scannable assets land in `in_scope`; ineligible + ones go to `out_of_scope`; non-network types are `skipped`. + """ + raw = json.loads(Path(path).read_text()) + items: List[Any] = raw["data"] if isinstance(raw, dict) and "data" in raw else raw + result = ScopeImport() + for item in items: + attrs = item.get("attributes", item) if isinstance(item, dict) else {} + ident = attrs.get("asset_identifier", "") + atype = (attrs.get("asset_type") or "").upper() + if not ident: + continue + if atype not in SCANNABLE_ASSET_TYPES: + result.skipped.append(f"{ident} ({atype or 'UNKNOWN'})") + continue + token = _normalize_asset(ident, atype) + if not token: + result.skipped.append(f"{ident} ({atype})") + continue + if attrs.get("eligible_for_submission", True): + result.in_scope.append(token) + else: + result.out_of_scope.append(token) + return result + + +def _bc_iter_targets(raw: Any) -> List[dict]: + """Yield flat target dicts from any of the common Bugcrowd JSON shapes. + + Handles three real-world shapes: + 1. API export: {"data":[{"attributes":{"target_groups":[{"targets":[...]}]}}]} + or {"target_groups":[{"targets":[...], "in_scope":bool}]} + 2. bounty-targets-data flat list: [{"name"/"target", "type", "in_scope"}] + 3. rescope/bbscope: {"in_scope":[...], "out_of_scope":[...]} + """ + targets: List[dict] = [] + + # Shape 3: explicit in/out lists of strings. + if isinstance(raw, dict) and ("in_scope" in raw or "out_of_scope" in raw): + for name in raw.get("in_scope", []): + targets.append({"name": name, "in_scope": True, "category": "website"}) + for name in raw.get("out_of_scope", []): + targets.append({"name": name, "in_scope": False, "category": "website"}) + return targets + + # Shape 1: target_groups (possibly under data[].attributes). + groups = None + if isinstance(raw, dict): + if "target_groups" in raw: + groups = raw["target_groups"] + elif "data" in raw and isinstance(raw["data"], list): + groups = [] + for prog in raw["data"]: + attrs = prog.get("attributes", prog) if isinstance(prog, dict) else {} + groups.extend(attrs.get("target_groups", [])) + if groups: + for grp in groups: + grp_in = grp.get("in_scope", True) + for t in grp.get("targets", []): + t = dict(t) + t.setdefault("in_scope", grp_in) + targets.append(t) + return targets + + # Shape 2: flat list. + if isinstance(raw, list): + return [t for t in raw if isinstance(t, dict)] + return targets + + +def import_bugcrowd_scope(path: str) -> ScopeImport: + """Parse a Bugcrowd scope export into a ScopeImport. + + Tolerant of several JSON shapes (see `_bc_iter_targets`). A target's + `category` decides scannability; `in_scope` (default True) splits the + eligible targets from explicitly out-of-scope ones. + """ + raw = json.loads(Path(path).read_text()) + result = ScopeImport() + for t in _bc_iter_targets(raw): + name = (t.get("name") or t.get("target") or t.get("uri") or "").strip() + if not name: + continue + category = (t.get("category") or t.get("type") or "website").lower() + if category not in SCANNABLE_BC_CATEGORIES: + result.skipped.append(f"{name} ({category})") + continue + atype = "WILDCARD" if name.startswith("*") else "URL" + token = _normalize_asset(name, atype) + if not token: + result.skipped.append(f"{name} ({category})") + continue + if t.get("in_scope", True): + result.in_scope.append(token) + else: + result.out_of_scope.append(token) + return result diff --git a/tests/unit/test_scope_matching.py b/tests/unit/test_scope_matching.py new file mode 100644 index 0000000..66b0537 --- /dev/null +++ b/tests/unit/test_scope_matching.py @@ -0,0 +1,187 @@ +"""Day 27 — scope import + wildcard/exclusion matching edge cases.""" + +import json + + +from cyberai.agents.exploit.safety_validator import ( + _matches_entry, + _split_scope, + _target_in_scope, +) +from cyberai.cli.scope import import_bugcrowd_scope, import_h1_scope + + +# ── _matches_entry ───────────────────────────────────────────────────── +def test_matches_exact(): + assert _matches_entry("api.acme.com", "api.acme.com") + assert not _matches_entry("api.acme.com", "web.acme.com") + + +def test_matches_wildcard_subdomain(): + assert _matches_entry("api.acme.com", "*.acme.com") + assert _matches_entry("deep.nested.acme.com", "*.acme.com") + + +def test_wildcard_does_not_match_apex(): + assert not _matches_entry("acme.com", "*.acme.com") + + +def test_matches_cidr(): + assert _matches_entry("10.0.0.5", "10.0.0.0/24") + assert not _matches_entry("10.0.1.5", "10.0.0.0/24") + + +def test_matches_entry_hostname_vs_cidr_no_crash(): + # hostname target against CIDR entry must not raise + assert not _matches_entry("api.acme.com", "10.0.0.0/24") + + +# ── _split_scope ─────────────────────────────────────────────────────── +def test_split_scope_separates_exclusions(): + allow, exclude = _split_scope(["*.acme.com", "!staging.acme.com", "10.0.0.0/24"]) + assert allow == ["*.acme.com", "10.0.0.0/24"] + assert exclude == ["staging.acme.com"] + + +def test_split_scope_strips_marker_whitespace(): + allow, exclude = _split_scope(["! staging.acme.com "]) + assert exclude == ["staging.acme.com"] + assert allow == [] + + +# ── _target_in_scope with exclusions ─────────────────────────────────── +def test_api_in_wildcard_scope(): + assert _target_in_scope("api.acme.com", ["*.acme.com"]) + + +def test_exclusion_beats_wildcard(): + scope = ["*.acme.com", "!staging.acme.com"] + assert _target_in_scope("api.acme.com", scope) + assert not _target_in_scope("staging.acme.com", scope) + + +def test_nested_excluded_subdomain(): + # internal.staging.acme.com excluded via !*.staging.acme.com + scope = ["*.acme.com", "!*.staging.acme.com"] + assert not _target_in_scope("internal.staging.acme.com", scope) + assert _target_in_scope("api.acme.com", scope) + + +def test_exclusion_cidr(): + scope = ["10.0.0.0/16", "!10.0.5.0/24"] + assert _target_in_scope("10.0.1.1", scope) + assert not _target_in_scope("10.0.5.7", scope) + + +def test_no_allow_only_exclusion_is_out(): + assert not _target_in_scope("anything.com", ["!evil.com"]) + + +def test_empty_scope_is_out(): + assert not _target_in_scope("api.acme.com", []) + + +# ── H1 import ────────────────────────────────────────────────────────── +def _write(tmp_path, name, data): + p = tmp_path / name + p.write_text(json.dumps(data)) + return str(p) + + +def test_h1_import_envelope(tmp_path): + data = { + "data": [ + { + "attributes": { + "asset_identifier": "https://api.acme.com/v1", + "asset_type": "URL", + "eligible_for_submission": True, + } + }, + { + "attributes": { + "asset_identifier": "*.acme.com", + "asset_type": "WILDCARD", + "eligible_for_submission": True, + } + }, + { + "attributes": { + "asset_identifier": "com.acme.app", + "asset_type": "GOOGLE_PLAY_APP_ID", + "eligible_for_submission": True, + } + }, + { + "attributes": { + "asset_identifier": "old.acme.com", + "asset_type": "URL", + "eligible_for_submission": False, + } + }, + ] + } + res = import_h1_scope(_write(tmp_path, "h1.json", data)) + assert "api.acme.com" in res.in_scope # URL normalized (scheme/path stripped) + assert "*.acme.com" in res.in_scope # wildcard passthrough + assert "old.acme.com" in res.out_of_scope # ineligible + assert any("GOOGLE_PLAY" in s for s in res.skipped) # non-network skipped + + +def test_h1_import_bare_list(tmp_path): + data = [ + {"attributes": {"asset_identifier": "x.acme.com", "asset_type": "URL"}}, + ] + res = import_h1_scope(_write(tmp_path, "h1b.json", data)) + assert res.in_scope == ["x.acme.com"] # eligible defaults True + + +# ── Bugcrowd import ──────────────────────────────────────────────────── +def test_bugcrowd_target_groups(tmp_path): + data = { + "target_groups": [ + { + "in_scope": True, + "targets": [ + {"name": "*.acme.com", "category": "website"}, + {"name": "https://api.acme.com", "category": "api"}, + {"name": "Acme Android", "category": "android"}, + ], + }, + { + "in_scope": False, + "targets": [{"name": "legacy.acme.com", "category": "website"}], + }, + ] + } + res = import_bugcrowd_scope(_write(tmp_path, "bc.json", data)) + assert "*.acme.com" in res.in_scope + assert "api.acme.com" in res.in_scope + assert "legacy.acme.com" in res.out_of_scope + assert any("android" in s for s in res.skipped) + + +def test_bugcrowd_in_out_lists(tmp_path): + data = {"in_scope": ["a.acme.com"], "out_of_scope": ["b.acme.com"]} + res = import_bugcrowd_scope(_write(tmp_path, "bc2.json", data)) + assert res.in_scope == ["a.acme.com"] + assert res.out_of_scope == ["b.acme.com"] + + +def test_bugcrowd_flat_list(tmp_path): + data = [ + {"name": "c.acme.com", "type": "website", "in_scope": True}, + {"name": "d.acme.com", "type": "website", "in_scope": False}, + ] + res = import_bugcrowd_scope(_write(tmp_path, "bc3.json", data)) + assert "c.acme.com" in res.in_scope + assert "d.acme.com" in res.out_of_scope + + +def test_scope_import_summary(): + from cyberai.cli.scope import ScopeImport + + si = ScopeImport(in_scope=["a"], out_of_scope=["b", "c"], skipped=["d"]) + summary = si.summary() + assert "1 in-scope" in summary + assert "2 out-of-scope" in summary