Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions cyberai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,51 @@ def replay(session_id: str) -> None:
raise SystemExit(run_replay(session_id, config))


@cli.group()
def scope() -> None:
"""Import and inspect bug-bounty program scopes."""


@scope.command("import")
@click.argument("platform", type=click.Choice(["h1", "hackerone", "bugcrowd", "bc"]))
@click.argument("scope_file", type=click.Path(exists=True))
def scope_import(platform: str, scope_file: str) -> None:
"""Import authorized scope from a PLATFORM SCOPE_FILE (JSON export).

Examples:
cyberai scope import h1 acme_scope.json
cyberai scope import bugcrowd acme_bc.json
"""
from cyberai.cli.scope import import_bugcrowd_scope, import_h1_scope

if platform in ("bugcrowd", "bc"):
result = import_bugcrowd_scope(scope_file)
else:
result = import_h1_scope(scope_file)
console.print(
Panel(
"\n".join(result.in_scope) or "[dim]none[/dim]",
title=f"In scope ({len(result.in_scope)})",
style="green",
)
)
if result.out_of_scope:
console.print(
Panel(
"\n".join(result.out_of_scope),
title=f"Out of scope ({len(result.out_of_scope)})",
style="red",
)
)
console.print(f"[dim]{result.summary()}[/dim]")
console.print(
"[dim]Use with: cyberai scan <target> "
+ " ".join(f"--scope {s}" for s in result.in_scope[:3])
+ (" ..." if len(result.in_scope) > 3 else "")
+ "[/dim]"
)


@cli.command()
def status() -> None:
"""Show CyberAI status and config."""
Expand Down
71 changes: 53 additions & 18 deletions cyberai/agents/exploit/safety_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,58 @@ def _check_target_ip(target: str) -> List[str]:
return violations


def _target_in_scope(target: str, scope: List[str]) -> bool:
"""Check if target matches any entry in authorized scope list."""
for entry in scope:
# Exact hostname match
if target.lower() == entry.lower():
def _matches_entry(target: str, entry: str) -> bool:
"""True if `target` matches a single scope entry (exact / wildcard / CIDR).

Exclusion markers (leading '!') are NOT handled here — strip them before
calling. Wildcard `*.example.com` matches any subdomain but not the apex.
"""
entry = entry.strip()
# Exact hostname match
if target.lower() == entry.lower():
return True
# Wildcard hostname: *.example.com
if entry.startswith("*."):
domain = entry[2:].lower()
if target.lower().endswith(f".{domain}"):
return True
# Wildcard hostname: *.example.com
if entry.startswith("*."):
domain = entry[2:]
if target.lower().endswith(f".{domain}"):
return True
# CIDR match
try:
ip = ipaddress.ip_address(target)
network = ipaddress.ip_network(entry, strict=False)
if ip in network:
return True
except ValueError:
continue
# CIDR / IP match
try:
ip = ipaddress.ip_address(target)
network = ipaddress.ip_network(entry, strict=False)
if ip in network:
return True
except ValueError:
pass
return False


def _split_scope(scope: List[str]) -> tuple[List[str], List[str]]:
"""Partition a scope list into (allow, exclude).

Entries starting with '!' are exclusions (out-of-scope), e.g.
`!staging.acme.com` or `!10.0.5.0/24`. The marker is stripped.
"""
allow: List[str] = []
exclude: List[str] = []
for entry in scope:
e = entry.strip()
if e.startswith("!"):
exclude.append(e[1:].strip())
else:
allow.append(e)
return allow, exclude


def _target_in_scope(target: str, scope: List[str]) -> bool:
"""Check if target is authorized: matches an allow entry AND no exclusion.

Exclusions (`!host`) take precedence — a target inside `*.acme.com` but
also matching `!staging.acme.com` is OUT of scope. This mirrors real
bug-bounty briefs where a wildcard is in-scope minus specific subdomains.
"""
allow, exclude = _split_scope(scope)
# Exclusions win — checked first.
if any(_matches_entry(target, ex) for ex in exclude):
return False
return any(_matches_entry(target, a) for a in allow)
168 changes: 167 additions & 1 deletion cyberai/cli/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,38 @@
"""

import ipaddress
import json
import re
from typing import List
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, List
from urllib.parse import urlparse

from cyberai.core.safety import ScopeConfig

# Asset types from HackerOne/Bugcrowd that map to network-scannable targets.
# Non-network types (mobile app IDs, source repos, hardware, "other") are
# skipped — the pipeline can't scan an App Store ID.
SCANNABLE_ASSET_TYPES = {
"URL",
"WILDCARD",
"CIDR",
"IP_ADDRESS",
"DOMAIN",
"API",
"WEBSITE",
}

# Bugcrowd target categories that map to network-scannable targets.
SCANNABLE_BC_CATEGORIES = {
"website",
"api",
"url",
"ip",
"cidr",
"wildcard",
}


def parse_scope(scope_str: str) -> ScopeConfig:
"""
Expand Down Expand Up @@ -65,3 +93,141 @@ def format_scope(scope: ScopeConfig) -> str:
if not parts:
return "no scope defined"
return ", ".join(parts)


@dataclass
class ScopeImport:
"""Result of importing a bug-bounty program scope file."""

in_scope: List[str] = field(default_factory=list)
out_of_scope: List[str] = field(default_factory=list)
skipped: List[str] = field(default_factory=list)

def summary(self) -> str:
return (
f"{len(self.in_scope)} in-scope, "
f"{len(self.out_of_scope)} out-of-scope, "
f"{len(self.skipped)} skipped (non-network)"
)


def _normalize_asset(identifier: str, asset_type: str) -> str:
"""Reduce an asset identifier to a host/wildcard/CIDR token.

URL assets become bare hosts (`https://api.x.com/v1` -> `api.x.com`).
WILDCARD/CIDR/IP pass through unchanged. Ports and paths are stripped.
"""
ident = identifier.strip()
atype = asset_type.upper()
if atype in {"WILDCARD", "CIDR", "IP_ADDRESS"}:
return ident
# URL/DOMAIN/API/WEBSITE -> strip scheme, path, port.
if "://" in ident:
ident = urlparse(ident).netloc or urlparse(ident).path
ident = ident.split("/")[0].split(":")[0]
return ident.strip().lower()


def import_h1_scope(path: str) -> ScopeImport:
"""Parse a HackerOne structured-scopes JSON export into a ScopeImport.

Accepts either the raw JSON:API envelope ({"data": [...]}) or a bare
list of structured-scope objects. Each item carries an `attributes`
block with `asset_identifier`, `asset_type`, `eligible_for_submission`.
Only eligible, network-scannable assets land in `in_scope`; ineligible
ones go to `out_of_scope`; non-network types are `skipped`.
"""
raw = json.loads(Path(path).read_text())
items: List[Any] = raw["data"] if isinstance(raw, dict) and "data" in raw else raw
result = ScopeImport()
for item in items:
attrs = item.get("attributes", item) if isinstance(item, dict) else {}
ident = attrs.get("asset_identifier", "")
atype = (attrs.get("asset_type") or "").upper()
if not ident:
continue
if atype not in SCANNABLE_ASSET_TYPES:
result.skipped.append(f"{ident} ({atype or 'UNKNOWN'})")
continue
token = _normalize_asset(ident, atype)
if not token:
result.skipped.append(f"{ident} ({atype})")
continue
if attrs.get("eligible_for_submission", True):
result.in_scope.append(token)
else:
result.out_of_scope.append(token)
return result


def _bc_iter_targets(raw: Any) -> List[dict]:
"""Yield flat target dicts from any of the common Bugcrowd JSON shapes.

Handles three real-world shapes:
1. API export: {"data":[{"attributes":{"target_groups":[{"targets":[...]}]}}]}
or {"target_groups":[{"targets":[...], "in_scope":bool}]}
2. bounty-targets-data flat list: [{"name"/"target", "type", "in_scope"}]
3. rescope/bbscope: {"in_scope":[...], "out_of_scope":[...]}
"""
targets: List[dict] = []

# Shape 3: explicit in/out lists of strings.
if isinstance(raw, dict) and ("in_scope" in raw or "out_of_scope" in raw):
for name in raw.get("in_scope", []):
targets.append({"name": name, "in_scope": True, "category": "website"})
for name in raw.get("out_of_scope", []):
targets.append({"name": name, "in_scope": False, "category": "website"})
return targets

# Shape 1: target_groups (possibly under data[].attributes).
groups = None
if isinstance(raw, dict):
if "target_groups" in raw:
groups = raw["target_groups"]
elif "data" in raw and isinstance(raw["data"], list):
groups = []
for prog in raw["data"]:
attrs = prog.get("attributes", prog) if isinstance(prog, dict) else {}
groups.extend(attrs.get("target_groups", []))
if groups:
for grp in groups:
grp_in = grp.get("in_scope", True)
for t in grp.get("targets", []):
t = dict(t)
t.setdefault("in_scope", grp_in)
targets.append(t)
return targets

# Shape 2: flat list.
if isinstance(raw, list):
return [t for t in raw if isinstance(t, dict)]
return targets


def import_bugcrowd_scope(path: str) -> ScopeImport:
"""Parse a Bugcrowd scope export into a ScopeImport.

Tolerant of several JSON shapes (see `_bc_iter_targets`). A target's
`category` decides scannability; `in_scope` (default True) splits the
eligible targets from explicitly out-of-scope ones.
"""
raw = json.loads(Path(path).read_text())
result = ScopeImport()
for t in _bc_iter_targets(raw):
name = (t.get("name") or t.get("target") or t.get("uri") or "").strip()
if not name:
continue
category = (t.get("category") or t.get("type") or "website").lower()
if category not in SCANNABLE_BC_CATEGORIES:
result.skipped.append(f"{name} ({category})")
continue
atype = "WILDCARD" if name.startswith("*") else "URL"
token = _normalize_asset(name, atype)
if not token:
result.skipped.append(f"{name} ({category})")
continue
if t.get("in_scope", True):
result.in_scope.append(token)
else:
result.out_of_scope.append(token)
return result
Loading
Loading