Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Static patterns: privilege escalation (PE1–PE4). Node and analyze() in one module."""
"""Static patterns: privilege escalation (PE1–PE5). Node and analyze() in one module."""

from __future__ import annotations

Expand Down Expand Up @@ -99,10 +99,22 @@
(r"\bDockerClient\s*\(", 0.85),
(r"http\+unix://.*docker\.sock", 0.9),
]
PE5_PATTERNS = [
(r"--privileged", 0.8),
(r"""(?:-v|--volume)['",\s=]+/:""", 0.85),
(r"--cap-add[=\s]+(?:SYS_ADMIN|ALL|SYS_PTRACE|NET_ADMIN)", 0.85),
(r"--(?:pid|net|network|ipc|uts)[=\s]+host", 0.8),
(r"--device[=\s]+/dev/", 0.7),
(r"--security-opt[=\s]+\S*unconfined", 0.85),
(r"\bnsenter\b", 0.9),
(r"/sys/fs/cgroup/.*release_agent", 0.95),
(r"/proc/\d+/ns/", 0.85),
(r"""\bunshare\b['",\s]+--(?:user|mount|pid)""", 0.85),
]


def analyze(content: str, file_path: str, file_type: str) -> list[AnalyzerFinding]:
"""Analyze content for privilege escalation patterns (PE1–PE4)."""
"""Analyze content for privilege escalation patterns (PE1–PE5)."""
findings: list[AnalyzerFinding] = []

def loc(ln: int) -> Location:
Expand Down Expand Up @@ -184,6 +196,28 @@ def loc(ln: int) -> Location:
matched_text=match.group(0)[:200],
)
findings.extend(pe4_best.values())
# Collect best-confidence PE5 finding per line — a single `docker run` line
# often matches multiple flags (e.g. --privileged + --cap-add=SYS_ADMIN).
pe5_best: dict[int, AnalyzerFinding] = {}
for pattern, confidence in PE5_PATTERNS:
for match in re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE):
line_num = get_line_number(content, match.start())
context = get_context(content, match.start())
if _is_documentation_example(context, file_type):
continue
if line_num in pe5_best and pe5_best[line_num].confidence >= confidence:
continue
pe5_best[line_num] = AnalyzerFinding(
rule_id="PE5",
message="Privileged Container / Container Escape",
severity=Severity.HIGH,
location=loc(line_num),
confidence=confidence,
tags=tag,
context=context,
matched_text=match.group(0)[:200],
)
findings.extend(pe5_best.values())
return findings


Expand Down
126 changes: 126 additions & 0 deletions tests/nodes/analyzers/test_static_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,132 @@ def test_pe4_node_runs_over_state(self):
assert any(f.rule_id == "PE4" for f in result["findings"])


class TestRunStaticPatternsPrivilegeEscalationPE5:
"""run_static_patterns with privilege_escalation: PE5 (privileged container / container escape)."""

def test_pe5_privileged_flag_produces_finding(self):
"""docker run --privileged yields PE5 (HIGH)."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', '--privileged', 'alpine', 'id'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
pe5 = [f for f in findings if f.rule_id == "PE5"]
assert len(pe5) >= 1
assert pe5[0].severity == "HIGH"
assert pe5[0].file == "skill.py"
assert pe5[0].start_line >= 1
assert pe5[0].remediation is not None
assert pe5[0].context is not None
assert pe5[0].matched_text is not None

def test_pe5_host_root_mount_produces_finding(self):
"""docker run -v /:/host (host root filesystem mount) yields PE5 (HIGH)."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', '-v', '/:/host', 'alpine', 'ls', '/host'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert any(f.rule_id == "PE5" and f.severity == "HIGH" for f in findings)

def test_pe5_cap_add_sys_admin_produces_finding(self):
"""--cap-add=SYS_ADMIN yields PE5."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', '--cap-add=SYS_ADMIN', 'alpine', 'id'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert any(f.rule_id == "PE5" for f in findings)

def test_pe5_host_namespace_produces_finding(self):
"""--pid=host / --net=host (shared host namespaces) yields PE5."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', '--pid=host', '--net=host', 'alpine', 'ps'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert any(f.rule_id == "PE5" for f in findings)

def test_pe5_nsenter_produces_finding(self):
"""nsenter into host PID 1 yields PE5 (HIGH)."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['nsenter', '--target', '1', '--mount', '--pid', 'id'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert any(f.rule_id == "PE5" and f.severity == "HIGH" for f in findings)

def test_pe5_cgroup_release_agent_produces_finding(self):
"""cgroup release_agent write (CVE-2022-0492 class) yields PE5 at highest confidence."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "open('/sys/fs/cgroup/release_agent', 'w').write('/tmp/x.sh')\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
pe5 = [f for f in findings if f.rule_id == "PE5"]
assert len(pe5) >= 1
assert pe5[0].confidence == 0.95

def test_pe5_unshare_produces_finding(self):
"""unshare --user --map-root-user yields PE5."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['unshare', '--user', '--map-root-user', 'bash'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert any(f.rule_id == "PE5" for f in findings)

def test_pe5_combined_line_produces_exactly_one_finding(self):
"""A single docker run line matching multiple PE5 flags yields exactly one PE5 finding."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', '--privileged', '--cap-add=SYS_ADMIN', '--pid=host', 'alpine'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
pe5 = [f for f in findings if f.rule_id == "PE5"]
assert len(pe5) == 1, (
f"Expected 1 PE5 finding, got {len(pe5)}: {[f.matched_text for f in pe5]}"
)

def test_pe5_safe_docker_run_not_flagged(self):
"""Plain docker run without dangerous flags produces no PE5."""
state = {
"components": ["skill.py"],
"file_cache": {
"skill.py": "subprocess.run(['docker', 'run', 'alpine', 'echo', 'hi'])\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert not any(f.rule_id == "PE5" for f in findings)

def test_pe5_documentation_example_not_flagged(self):
"""--privileged inside a markdown code block is filtered as documentation."""
state = {
"components": ["SKILL.md"],
"file_cache": {
"SKILL.md": "# Docker\n\nFor example:\n```bash\ndocker run --privileged alpine id\n```\n",
},
}
findings = static_runner.run_static_patterns(state, [privilege_escalation_module])
assert not any(f.rule_id == "PE5" for f in findings)


class TestRunStaticPatternsSSRF:
"""run_static_patterns with ssrf: SSRF1, SSRF2, SSRF3."""

Expand Down
Loading