diff --git a/src/skillspector/nodes/analyzers/pattern_defaults.py b/src/skillspector/nodes/analyzers/pattern_defaults.py index dcece108..d3b5f4c6 100644 --- a/src/skillspector/nodes/analyzers/pattern_defaults.py +++ b/src/skillspector/nodes/analyzers/pattern_defaults.py @@ -81,6 +81,7 @@ class PatternCategory(StrEnum): "TM1": "Tool parameters are crafted to achieve unintended or unsafe behavior. Parameter abuse can bypass intended safety checks (e.g. shell=True, --force, dangerous glob patterns).", "TM2": "Tool calls are chained to bypass individual safety checks or escalate capabilities beyond what any single tool call would allow.", "TM3": "Tool defaults are unsafe or overly permissive (e.g. disabled TLS verification, no authentication, world-writable permissions). Unsafe defaults widen the attack surface.", + "TM4": "Code deploys a privileged Kubernetes workload (privileged container, hostPath mount, or host namespaces). This grants root on the node and is a node/cluster takeover vector.", # Rogue Agent (B.1.11) "RA1": "Skill modifies its own code, configuration, or behavior at runtime. Self-modification enables an agent to escalate privileges, disable safety constraints, or install persistent backdoors.", "RA2": "Skill establishes unauthorized persistence across sessions via cron jobs, startup scripts, or state files. Session persistence allows an attacker to maintain access beyond the current interaction.", @@ -170,6 +171,7 @@ class PatternCategory(StrEnum): "TM1": PatternCategory.TOOL_MISUSE.value, "TM2": PatternCategory.TOOL_MISUSE.value, "TM3": PatternCategory.TOOL_MISUSE.value, + "TM4": PatternCategory.TOOL_MISUSE.value, "RA1": PatternCategory.ROGUE_AGENT.value, "RA2": PatternCategory.ROGUE_AGENT.value, "SC4": PatternCategory.SUPPLY_CHAIN.value, @@ -245,6 +247,7 @@ class PatternCategory(StrEnum): "TM1": "Tool Parameter Abuse", "TM2": "Chaining Abuse", "TM3": "Unsafe Defaults", + "TM4": "Privileged Kubernetes Workload", "RA1": "Self-Modification", "RA2": "Session Persistence", "SC4": "Known Vulnerable Dependency", @@ -325,6 +328,7 @@ class PatternCategory(StrEnum): "TM1": "Validate all tool parameters against an allowlist. Reject dangerous parameter values (shell=True, --force, -rf /) and use safe defaults.", "TM2": "Limit tool chaining depth and validate the output of each tool before passing it to the next. Require explicit user approval for multi-step chains.", "TM3": "Override unsafe defaults with secure settings (verify=True, auth required, restrictive permissions). Review and harden all tool configurations.", + "TM4": "Remove privileged, hostPath, and host-namespace settings from workloads. Use a least-privilege securityContext, drop capabilities, and avoid mounting the host filesystem.", # Rogue Agent (B.1.11) "RA1": "Prevent the skill from modifying its own code, SKILL.md, or configuration files. Treat skill files as read-only at runtime.", "RA2": "Remove any persistence mechanisms (cron jobs, startup scripts, state files). Skills should not maintain state across sessions without explicit user consent.", diff --git a/src/skillspector/nodes/analyzers/static_patterns_tool_misuse.py b/src/skillspector/nodes/analyzers/static_patterns_tool_misuse.py index 04bfa1cd..c5884501 100644 --- a/src/skillspector/nodes/analyzers/static_patterns_tool_misuse.py +++ b/src/skillspector/nodes/analyzers/static_patterns_tool_misuse.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Static patterns: tool misuse (TM1–TM3). Node and analyze() in one module. +"""Static patterns: tool misuse (TM1–TM4). Node and analyze() in one module. Detects patterns where tool parameters are abused (TM1), tool chaining -is used to bypass safety (TM2), or tool defaults are unsafe (TM3). +is used to bypass safety (TM2), tool defaults are unsafe (TM3), or a +privileged Kubernetes workload is deployed (TM4). Framework: ASI02. """ @@ -31,7 +32,7 @@ from skillspector.state import AnalyzerNodeResponse, SkillspectorState from . import static_runner -from .common import get_context, get_line_number +from .common import get_context, get_line_number, is_code_example from .pattern_defaults import PatternCategory logger = get_logger(__name__) @@ -149,6 +150,18 @@ ), ] +# TM4: Privileged Kubernetes Workload — manifest/CLI primitives that grant +# node/host takeover (the cluster-scale counterpart of a privileged container). +# Only isolation-breaking signals are matched, so a normal `kubectl apply` or a +# plain DaemonSet does not fire. +TM4_PATTERNS = [ + (r"privileged\s*:\s*true", 0.7), # privileged container in a manifest + (r"hostPath\s*:", 0.55), # host filesystem mount + (r"host(?:PID|Network|IPC)\s*:\s*true", 0.6), # host namespace sharing + (r"kubectl\s+run\b[^\n]*--privileged", 0.7), # privileged ad-hoc pod + (r"--set\b[^\n]*privileged\s*=\s*true", 0.6), # helm privileged override +] + _SAFE_CONTAINER_PATTERNS: tuple[re.Pattern[str], ...] = ( re.compile(r"docker\s+run\s+.*--rm", re.IGNORECASE), @@ -267,6 +280,26 @@ def ctx(start: int) -> str: matched_text=match.group(0)[:200], ) ) + # TM4: privileged K8s workload. Filtered through is_code_example() because + # privileged/hostPath fields commonly appear in SKILL.md docs and examples. + for pattern, confidence in TM4_PATTERNS: + for match in re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE): + context_text = ctx(match.start()) + if is_code_example(context_text): + continue + line_num = get_line_number(content, match.start()) + findings.append( + AnalyzerFinding( + rule_id="TM4", + message="Privileged Kubernetes Workload", + severity=Severity.HIGH, + location=loc(line_num), + confidence=confidence, + tags=tag, + context=context_text, + matched_text=match.group(0)[:200], + ) + ) return findings diff --git a/tests/unit/test_patterns_new.py b/tests/unit/test_patterns_new.py index 575e6bdf..de2f6789 100644 --- a/tests/unit/test_patterns_new.py +++ b/tests/unit/test_patterns_new.py @@ -684,6 +684,58 @@ def test_tm1_rm_outside_dockerfile_stays_high(self) -> None: def test_tm3_detected(self, content: str, filename: str, filetype: str) -> None: assert any(f.rule_id == "TM3" for f in tm_mod.analyze(content, filename, filetype)) + @pytest.mark.parametrize( + "content,filename,filetype", + [ + pytest.param( + " securityContext:\n privileged: true", + "daemonset.yaml", + "yaml", + id="privileged_true", + ), + pytest.param( + " volumes:\n - hostPath:\n path: /", + "ds.yaml", + "yaml", + id="hostpath", + ), + pytest.param(" hostPID: true", "ds.yaml", "yaml", id="hostpid"), + pytest.param(" hostNetwork: true", "ds.yaml", "yaml", id="hostnetwork"), + pytest.param( + "kubectl run probe --image=alpine --privileged", + "deploy.sh", + "shell", + id="kubectl_run_privileged", + ), + pytest.param( + "helm install m ./c --set securityContext.privileged=true", + "deploy.sh", + "shell", + id="helm_privileged", + ), + ], + ) + def test_tm4_detected(self, content: str, filename: str, filetype: str) -> None: + assert any(f.rule_id == "TM4" for f in tm_mod.analyze(content, filename, filetype)) + + def test_tm4_severity_high(self) -> None: + findings = tm_mod.analyze( + " securityContext:\n privileged: true", "ds.yaml", "yaml" + ) + tm4 = [f for f in findings if f.rule_id == "TM4"] + assert tm4 and tm4[0].severity == Severity.HIGH + + def test_tm4_benign_workload_not_flagged(self) -> None: + content = ( + "kind: DaemonSet\nspec:\n template:\n spec:\n containers:\n" + " - name: app\n image: nginx" + ) + assert not any(f.rule_id == "TM4" for f in tm_mod.analyze(content, "ds.yaml", "yaml")) + + def test_tm4_documentation_example_excluded(self) -> None: + content = "For example, never set privileged: true in your manifests." + assert not any(f.rule_id == "TM4" for f in tm_mod.analyze(content, "README.md", "markdown")) + def test_safe_content_produces_no_findings(self) -> None: findings = tm_mod.analyze( "import json\ndata = json.loads(input_str)", "parser.py", "python"