Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion cyberai/agents/report/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cyberai.core.types import ReportSection

from .json_exporter import export_json
from .judge import judge_report
from .markdown_renderer import render_markdown


Expand Down Expand Up @@ -67,12 +68,59 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
if section is not None:
self.kb.set("report.section", section.model_dump(), agent=self.AGENT_NAME)

return {
# Flag-gated: LLM-as-Judge cross-checks the report against KB evidence.
verdict_dump = None
if getattr(self.config, "use_judge", False) and self.llm is not None:
verdict = judge_report(
md_content,
self.session,
self.llm,
threshold=getattr(self.config, "judge_threshold", 0.7),
judge_model=getattr(self.config, "judge_model", None),
)
verdict_dump = verdict.model_dump()
self.kb.set("report.judge_verdict", verdict_dump, agent=self.AGENT_NAME)
md_content = self._append_verdict(md_content, verdict)
with open(md_path, "w") as f:
f.write(md_content)
self._log(
f"Judge: score={verdict.hallucination_score:.2f} supported={verdict.supported}"
)

result = {
"status": "done",
"markdown": md_path,
"json": json_path,
"total_findings": len(self.session.findings),
}
if verdict_dump is not None:
result["judge_verdict"] = verdict_dump
return result

def _append_verdict(self, md: str, verdict) -> str:
"""Append the judge verdict as a Markdown section to the report."""
status = "✅ SUPPORTED" if verdict.supported else "⚠️ UNSUPPORTED"
lines = [
md,
"",
"---",
"",
"## 🧑‍⚖️ Report Validation (LLM-as-Judge)",
"",
f"**Status:** {status} ",
f"**Hallucination score:** {verdict.hallucination_score:.2f} ",
]
if verdict.unsupported_claims:
lines.append("")
lines.append("**Unsupported claims:**")
lines.append("")
for claim in verdict.unsupported_claims:
lines.append(f"- {claim}")
if verdict.notes:
lines.append("")
lines.append(f"_Notes: {verdict.notes}_")
lines.append("")
return "\n".join(lines)

def _structured_summary(self, target: str):
"""Flag-gated: ask the LLM for a Pydantic-validated ReportSection.
Expand Down
161 changes: 161 additions & 0 deletions cyberai/agents/report/judge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""LLM-as-Judge — validates report claims against knowledge-base evidence.

A second (optionally more powerful) LLM cross-checks every claim in the
generated report against the evidence actually present in the session KB.
It returns a hallucination score in [0, 1] and a list of unsupported
claims. Flag-gated in ReportAgent (use_judge, default False) — the
deterministic report is never blocked by judge failures.
"""

from __future__ import annotations

import json
from contextlib import contextmanager
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, field_validator

from cyberai.core.llm_client import LLMClient
from cyberai.core.scan_session import ScanSession


class JudgeVerdict(BaseModel):
"""Structured verdict returned by the judge LLM."""

hallucination_score: float = 0.0
supported: bool = True
unsupported_claims: List[str] = Field(default_factory=list)
notes: str = ""

@field_validator("hallucination_score", mode="before")
@classmethod
def _clamp(cls, v: Any) -> float:
"""Clamp to [0,1] BEFORE type validation — the LLM may over/undershoot.

A misbehaving judge returning 1.2 must not crash the report; we squash
it into range rather than raising, matching the graceful-degradation
contract of the whole judge path.
"""
try:
return max(0.0, min(1.0, float(v)))
except (TypeError, ValueError):
return 0.0


# Flat JSON Schema for structured_call (OpenAI strict-friendly: no nesting).
VERDICT_SCHEMA: Dict[str, Any] = {
"type": "object",
"properties": {
"hallucination_score": {
"type": "number",
"description": "0.0 = every claim backed by evidence; 1.0 = all fabricated.",
},
"supported": {
"type": "boolean",
"description": "True if the report is sufficiently grounded in evidence.",
},
"unsupported_claims": {
"type": "array",
"items": {"type": "string"},
"description": "Claims in the report with no matching KB evidence.",
},
"notes": {"type": "string", "description": "Brief reviewer notes."},
},
"required": ["hallucination_score", "supported", "unsupported_claims", "notes"],
}

JUDGE_SYSTEM = (
"You are a strict security-report reviewer. You are given a penetration-"
"test report and the raw EVIDENCE collected during the scan (findings, "
"CVE IDs, tool artifacts). Your job: detect hallucinations. A claim is "
"UNSUPPORTED if it asserts a vulnerability, CVE, port, or impact that "
"does not appear in the evidence. Do NOT reward fluency. Score "
"hallucination_score in [0,1]: 0 means every claim is backed by "
"evidence, 1 means the report is fabricated. List each unsupported "
"claim verbatim. Respond ONLY via the structured schema."
)


def _collect_evidence(session: ScanSession) -> Dict[str, Any]:
"""Pull the ground-truth evidence the report must be consistent with."""
findings = []
for f in session.findings:
findings.append(
{
"id": f.id,
"title": f.title,
"severity": getattr(f.severity, "value", str(f.severity)),
"cve_ids": list(f.cve_ids),
"target": f.target,
"evidence": [str(e)[:500] for e in (f.evidence or [])],
}
)
return {
"target": session.target,
"findings": findings,
}


@contextmanager
def _judge_model(llm: LLMClient, model: Optional[str]):
"""Temporarily swap the LLM model to the (more powerful) judge model."""
if not model:
yield
return
original = llm.config.model
llm.config.model = model
try:
yield
finally:
llm.config.model = original


def judge_report(
report_text: str,
session: ScanSession,
llm: LLMClient,
*,
threshold: float = 0.7,
judge_model: Optional[str] = None,
agent_name: str = "report.judge",
) -> JudgeVerdict:
"""Cross-check `report_text` against session evidence via a second LLM.

Returns a JudgeVerdict. On ANY failure returns a graceful pass-through
verdict (score=0.0, supported=True) so the report pipeline never breaks.
`supported` is recomputed from the score against `threshold` regardless
of what the model claimed.
"""
if llm is None:
return JudgeVerdict(notes="judge unavailable: no LLM client")

evidence = _collect_evidence(session)
messages = [
{
"role": "user",
"content": (
"REPORT:\n"
f"{report_text}\n\n"
"EVIDENCE (ground truth):\n"
f"{json.dumps(evidence, indent=2, default=str)}"
),
}
]

try:
with _judge_model(llm, judge_model):
raw = llm.structured_call(
messages,
schema=VERDICT_SCHEMA,
schema_name="judge_verdict",
description="Hallucination verdict for a pentest report.",
system=JUDGE_SYSTEM,
agent_name=agent_name,
)
verdict = JudgeVerdict.model_validate(raw)
except Exception as exc: # noqa: BLE001 — judge must never hard-fail
return JudgeVerdict(notes=f"judge unavailable: {exc}")

# Threshold is authoritative — don't trust the model's own `supported`.
verdict.supported = verdict.hallucination_score < threshold
return verdict
3 changes: 3 additions & 0 deletions cyberai/agents/report/markdown_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def render_markdown(session: PentestSession) -> str:
f"{finding.description}",
"",
]
if getattr(finding, "confidence", 1.0) < 1.0:
lines.append(f"**Confidence:** {finding.confidence:.0%} ⚠️")
lines.append("")
if finding.cve:
lines.append(f"**CVE:** `{finding.cve}`")
lines.append("")
Expand Down
6 changes: 6 additions & 0 deletions cyberai/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class CyberAIConfig:
max_cost_usd: float = 0.0
# Flag-gated: run the nuclei template engine in ExploitAgent (day 23).
use_nuclei: bool = False
# Flag-gated: LLM-as-Judge validates the report vs KB evidence (day 26).
use_judge: bool = False
# Hallucination score >= threshold marks the report unsupported.
judge_threshold: float = 0.7
# Optional more-powerful model for the judge; None = same as main LLM.
judge_model: Optional[str] = None

@classmethod
def from_file(cls, path: str) -> "CyberAIConfig":
Expand Down
3 changes: 3 additions & 0 deletions cyberai/core/scan_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class Finding:
evidence: List[Any] = field(default_factory=list)
# Free-form structured data
data: Any = None
# Confidence this finding is real, 0..1. 1.0 = fully evidenced (default).
# Lowered by the LLM-as-Judge / agents when evidence is weak (day 26).
confidence: float = 1.0

def __post_init__(self) -> None:
# Keep `cve` and `cve_ids` in sync for callers that use either
Expand Down
Loading
Loading