diff --git a/src/skillspector/nodes/analyzers/static_patterns_privilege_escalation.py b/src/skillspector/nodes/analyzers/static_patterns_privilege_escalation.py index e874248..660bc0c 100644 --- a/src/skillspector/nodes/analyzers/static_patterns_privilege_escalation.py +++ b/src/skillspector/nodes/analyzers/static_patterns_privilege_escalation.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Static patterns: privilege escalation (PE1–PE4). Node and analyze() in one module.""" +"""Static patterns: privilege escalation (PE1–PE5). Node and analyze() in one module.""" from __future__ import annotations @@ -99,10 +99,22 @@ (r"\bDockerClient\s*\(", 0.85), (r"http\+unix://.*docker\.sock", 0.9), ] +PE5_PATTERNS = [ + (r"--privileged", 0.8), + (r"""(?:-v|--volume)['",\s=]+/:""", 0.85), + (r"--cap-add[=\s]+(?:SYS_ADMIN|ALL|SYS_PTRACE|NET_ADMIN)", 0.85), + (r"--(?:pid|net|network|ipc|uts)[=\s]+host", 0.8), + (r"--device[=\s]+/dev/", 0.7), + (r"--security-opt[=\s]+\S*unconfined", 0.85), + (r"\bnsenter\b", 0.9), + (r"/sys/fs/cgroup/.*release_agent", 0.95), + (r"/proc/\d+/ns/", 0.85), + (r"""\bunshare\b['",\s]+--(?:user|mount|pid)""", 0.85), +] def analyze(content: str, file_path: str, file_type: str) -> list[AnalyzerFinding]: - """Analyze content for privilege escalation patterns (PE1–PE4).""" + """Analyze content for privilege escalation patterns (PE1–PE5).""" findings: list[AnalyzerFinding] = [] def loc(ln: int) -> Location: @@ -184,6 +196,28 @@ def loc(ln: int) -> Location: matched_text=match.group(0)[:200], ) findings.extend(pe4_best.values()) + # Collect best-confidence PE5 finding per line — a single `docker run` line + # often matches multiple flags (e.g. --privileged + --cap-add=SYS_ADMIN). + pe5_best: dict[int, AnalyzerFinding] = {} + for pattern, confidence in PE5_PATTERNS: + for match in re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE): + line_num = get_line_number(content, match.start()) + context = get_context(content, match.start()) + if _is_documentation_example(context, file_type): + continue + if line_num in pe5_best and pe5_best[line_num].confidence >= confidence: + continue + pe5_best[line_num] = AnalyzerFinding( + rule_id="PE5", + message="Privileged Container / Container Escape", + severity=Severity.HIGH, + location=loc(line_num), + confidence=confidence, + tags=tag, + context=context, + matched_text=match.group(0)[:200], + ) + findings.extend(pe5_best.values()) return findings diff --git a/tests/nodes/analyzers/test_static_patterns.py b/tests/nodes/analyzers/test_static_patterns.py index b0e3454..ce8b474 100644 --- a/tests/nodes/analyzers/test_static_patterns.py +++ b/tests/nodes/analyzers/test_static_patterns.py @@ -442,6 +442,132 @@ def test_pe4_node_runs_over_state(self): assert any(f.rule_id == "PE4" for f in result["findings"]) +class TestRunStaticPatternsPrivilegeEscalationPE5: + """run_static_patterns with privilege_escalation: PE5 (privileged container / container escape).""" + + def test_pe5_privileged_flag_produces_finding(self): + """docker run --privileged yields PE5 (HIGH).""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', '--privileged', 'alpine', 'id'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + pe5 = [f for f in findings if f.rule_id == "PE5"] + assert len(pe5) >= 1 + assert pe5[0].severity == "HIGH" + assert pe5[0].file == "skill.py" + assert pe5[0].start_line >= 1 + assert pe5[0].remediation is not None + assert pe5[0].context is not None + assert pe5[0].matched_text is not None + + def test_pe5_host_root_mount_produces_finding(self): + """docker run -v /:/host (host root filesystem mount) yields PE5 (HIGH).""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', '-v', '/:/host', 'alpine', 'ls', '/host'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert any(f.rule_id == "PE5" and f.severity == "HIGH" for f in findings) + + def test_pe5_cap_add_sys_admin_produces_finding(self): + """--cap-add=SYS_ADMIN yields PE5.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', '--cap-add=SYS_ADMIN', 'alpine', 'id'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert any(f.rule_id == "PE5" for f in findings) + + def test_pe5_host_namespace_produces_finding(self): + """--pid=host / --net=host (shared host namespaces) yields PE5.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', '--pid=host', '--net=host', 'alpine', 'ps'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert any(f.rule_id == "PE5" for f in findings) + + def test_pe5_nsenter_produces_finding(self): + """nsenter into host PID 1 yields PE5 (HIGH).""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['nsenter', '--target', '1', '--mount', '--pid', 'id'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert any(f.rule_id == "PE5" and f.severity == "HIGH" for f in findings) + + def test_pe5_cgroup_release_agent_produces_finding(self): + """cgroup release_agent write (CVE-2022-0492 class) yields PE5 at highest confidence.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "open('/sys/fs/cgroup/release_agent', 'w').write('/tmp/x.sh')\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + pe5 = [f for f in findings if f.rule_id == "PE5"] + assert len(pe5) >= 1 + assert pe5[0].confidence == 0.95 + + def test_pe5_unshare_produces_finding(self): + """unshare --user --map-root-user yields PE5.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['unshare', '--user', '--map-root-user', 'bash'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert any(f.rule_id == "PE5" for f in findings) + + def test_pe5_combined_line_produces_exactly_one_finding(self): + """A single docker run line matching multiple PE5 flags yields exactly one PE5 finding.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', '--privileged', '--cap-add=SYS_ADMIN', '--pid=host', 'alpine'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + pe5 = [f for f in findings if f.rule_id == "PE5"] + assert len(pe5) == 1, ( + f"Expected 1 PE5 finding, got {len(pe5)}: {[f.matched_text for f in pe5]}" + ) + + def test_pe5_safe_docker_run_not_flagged(self): + """Plain docker run without dangerous flags produces no PE5.""" + state = { + "components": ["skill.py"], + "file_cache": { + "skill.py": "subprocess.run(['docker', 'run', 'alpine', 'echo', 'hi'])\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert not any(f.rule_id == "PE5" for f in findings) + + def test_pe5_documentation_example_not_flagged(self): + """--privileged inside a markdown code block is filtered as documentation.""" + state = { + "components": ["SKILL.md"], + "file_cache": { + "SKILL.md": "# Docker\n\nFor example:\n```bash\ndocker run --privileged alpine id\n```\n", + }, + } + findings = static_runner.run_static_patterns(state, [privilege_escalation_module]) + assert not any(f.rule_id == "PE5" for f in findings) + + class TestRunStaticPatternsSSRF: """run_static_patterns with ssrf: SSRF1, SSRF2, SSRF3."""