diff --git a/src/skillspector/nodes/analyzers/behavioral_ast.py b/src/skillspector/nodes/analyzers/behavioral_ast.py index a502e8e..d8898f4 100644 --- a/src/skillspector/nodes/analyzers/behavioral_ast.py +++ b/src/skillspector/nodes/analyzers/behavioral_ast.py @@ -28,6 +28,7 @@ get_context_from_lines, get_source_segment, resolve_call_name, + resolve_dynamic_import_call, ) from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding @@ -182,6 +183,10 @@ def _emit( continue call_name = resolve_call_name(ast_node, aliases) + if call_name is None: + # Dynamic-import chain: importlib.import_module('os').system(...) → + # 'os.system', so it re-enters the os./subprocess. sink ladders below. + call_name = resolve_dynamic_import_call(ast_node, aliases) if call_name is None: continue diff --git a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py index 83eff64..4cbcc6e 100644 --- a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py +++ b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py @@ -37,6 +37,7 @@ get_source_segment, resolve_call_name_typed, resolve_dotted_name, + resolve_dynamic_import_call, ) from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding @@ -170,6 +171,24 @@ ] +def _resolve_sink_name( + node: ast.Call, + type_map: dict[str, str] | None = None, + aliases: dict[str, str] | None = None, +) -> str | None: + """Resolve a call to its canonical sink name, including dynamic-import chains. + + Wraps :func:`resolve_call_name_typed` (type-/alias-aware resolution) and falls back + to :func:`resolve_dynamic_import_call` so that + ``importlib.import_module('subprocess').run(...)`` resolves to ``'subprocess.run'`` + and re-enters ``_EXEC_SINKS`` like the statically-imported form would. + """ + name = resolve_call_name_typed(node, type_map, aliases) + if name is None: + name = resolve_dynamic_import_call(node, aliases) + return name + + def _classify(name: str, categories: list[tuple[frozenset[str], str]], default: str) -> str: for names, label in categories: if name in names: @@ -363,7 +382,7 @@ def _emit( if not isinstance(ast_node, ast.Call): continue - sink_name = resolve_call_name_typed(ast_node, type_map, aliases) + sink_name = _resolve_sink_name(ast_node, type_map, aliases) if not sink_name or sink_name not in _ALL_SINKS: continue diff --git a/src/skillspector/nodes/analyzers/common.py b/src/skillspector/nodes/analyzers/common.py index 3fed091..22bde49 100644 --- a/src/skillspector/nodes/analyzers/common.py +++ b/src/skillspector/nodes/analyzers/common.py @@ -104,24 +104,47 @@ def resolve_dotted_name(node: ast.expr) -> str | None: return None +def _strip_builtins_prefix(name: str) -> str: + """Collapse a ``builtins``-qualified name back to its bare builtin name. + + ``builtins.exec`` → ``exec`` (and ``builtins.eval``/``compile``/``__import__``…). + The analyzers match dangerous builtins by their bare name (``call_name == "exec"``, + ``name in _EXEC_SINKS``), but ``from builtins import exec`` / ``import builtins; + builtins.exec(...)`` resolve, through the import-alias map, to the *qualified* + spelling ``builtins.exec`` — which would otherwise slip past those checks. Since + ``builtins.exec is exec`` at runtime, collapsing the prefix is semantically exact + and re-enters the existing bare-name detection. + + Only the single-segment form ``builtins.`` is collapsed; deeper chains + (``builtins.foo.bar``) are left untouched as they are not direct builtin calls. + """ + root, sep, rest = name.partition(".") + if root == "builtins" and sep and "." not in rest: + return rest + return name + + def apply_import_aliases(name: str, aliases: dict[str, str]) -> str: """Rewrite a resolved call name to its fully-qualified form using import aliases. - Bridges two evasion-prone spellings back to the canonical dotted name that the + Bridges several evasion-prone spellings back to the canonical name that the analyzers match against: - ``from os import system`` → ``{"system": "os.system"}`` so a bare ``system`` call resolves to ``"os.system"``. - ``import os as o`` → ``{"o": "os"}`` so ``o.system`` resolves to ``"os.system"``. + - ``from builtins import exec`` / ``import builtins; builtins.exec(...)`` → the + bare builtin ``exec`` (via :func:`_strip_builtins_prefix`), so dangerous + builtins matched by bare name are not hidden behind a ``builtins.`` qualifier. Idempotent for already-canonical names (``os.system`` stays ``os.system``). """ if name in aliases: - return aliases[name] + return _strip_builtins_prefix(aliases[name]) root, sep, rest = name.partition(".") if sep and root in aliases: - return f"{aliases[root]}.{rest}" - return name + return _strip_builtins_prefix(f"{aliases[root]}.{rest}") + return _strip_builtins_prefix(name) def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) -> str | None: @@ -138,6 +161,50 @@ def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) -> return name +def _dynamic_import_target(node: ast.expr, aliases: dict[str, str] | None = None) -> str | None: + """Return the imported module name for an ``importlib.import_module('mod')`` call. + + Recognizes both ``importlib.import_module('os')`` and the bare-imported + ``from importlib import import_module; import_module('os')`` (resolved via the + import-alias map), returning the string literal module name (``'os'``) when the + first positional argument is a constant. Returns ``None`` for anything else + (non-literal argument, unrelated call), so callers stay precise and avoid false + positives on dynamic module names the analyzer cannot resolve statically. + """ + if not isinstance(node, ast.Call): + return None + func_name = resolve_dotted_name(node.func) + if func_name is not None and aliases: + func_name = apply_import_aliases(func_name, aliases) + if func_name not in ("importlib.import_module", "import_module"): + return None + if node.args and isinstance(node.args[0], ast.Constant) and isinstance(node.args[0].value, str): + return node.args[0].value + return None + + +def resolve_dynamic_import_call( + node: ast.Call, aliases: dict[str, str] | None = None +) -> str | None: + """Resolve ``importlib.import_module('mod').attr(...)`` to the dotted sink ``'mod.attr'``. + + Bridges the dynamic-import evasion that mirrors ``__import__``: a skill writes + ``importlib.import_module('os').system(cmd)`` (or imports ``import_module`` bare) + so the dangerous module never appears as a static ``import``. When *node*'s callee + is an attribute access on such a chain, this returns the canonical sink name + (``'os.system'``, ``'subprocess.run'``) that the existing sink ladders already + match. Returns ``None`` when the chain is not a literal dynamic import, keeping the + resolution precise (no false positives on un-resolvable dynamic names). + """ + func = node.func + if not isinstance(func, ast.Attribute): + return None + module_name = _dynamic_import_target(func.value, aliases) + if module_name is None: + return None + return f"{module_name}.{func.attr}" + + def _build_import_aliases(tree: ast.Module) -> dict[str, str]: """Map locally imported names to their fully-qualified module paths. diff --git a/tests/nodes/analyzers/test_behavioral_ast.py b/tests/nodes/analyzers/test_behavioral_ast.py index 996fa1d..ae1a423 100644 --- a/tests/nodes/analyzers/test_behavioral_ast.py +++ b/tests/nodes/analyzers/test_behavioral_ast.py @@ -284,3 +284,92 @@ def test_multiple_dangerous_calls_in_one_file(self): assert "AST2" in rule_ids assert "AST4" in rule_ids assert "AST5" in rule_ids + + +# ── builtins / importlib import-chain evasion ───────────────────────── + + +class TestBuiltinsImportEvasion: + """Dangerous builtins hidden behind the ``builtins`` module must still alert. + + The analyzer matches dangerous builtins by their bare name (``exec``/``eval``/ + ``compile``/``__import__``). Writing ``from builtins import exec`` or + ``import builtins; builtins.exec(...)`` resolves, through the import-alias map, + to the qualified spelling ``builtins.exec`` — which would slip past the bare-name + checks unless it is canonicalized back. Since ``builtins.exec is exec``, the + collapse is semantically exact. Complements the ``getattr`` branch (PR #166). + """ + + def test_from_builtins_import_exec(self): + """``from builtins import exec; exec(code)`` must still raise AST1.""" + findings = _run("from builtins import exec\nexec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_from_builtins_import_eval(self): + """``from builtins import eval`` must still raise AST2.""" + findings = _run("from builtins import eval\neval('2 + 2')\n") + assert any(f.rule_id == "AST2" for f in findings) + + def test_from_builtins_import_compile(self): + """``from builtins import compile`` must still raise AST6.""" + findings = _run("from builtins import compile\ncompile('x', '', 'exec')\n") + assert any(f.rule_id == "AST6" for f in findings) + + def test_from_builtins_import_dunder_import(self): + """``from builtins import __import__`` must still raise AST3.""" + findings = _run("from builtins import __import__\n__import__('os')\n") + assert any(f.rule_id == "AST3" for f in findings) + + def test_import_builtins_dot_exec(self): + """``import builtins; builtins.exec(...)`` must still raise AST1.""" + findings = _run("import builtins\nbuiltins.exec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_import_builtins_as_alias_dot_exec(self): + """``import builtins as b2; b2.exec(...)`` must still raise AST1.""" + findings = _run("import builtins as b2\nb2.exec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_from_builtins_import_exec_as_alias(self): + """``from builtins import exec as e; e(...)`` must still raise AST1.""" + findings = _run("from builtins import exec as e\ne('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_user_module_exec_helper_no_false_positive(self): + """A benign helper merely *named* like a sink must not match (FP-neighbor). + + ``from mymod import exec_helper; exec_helper()`` imports an unrelated + third-party callable — it is not ``builtins.exec`` and must stay clean. + """ + findings = _run("from mymod import exec_helper\nexec_helper()\n") + assert findings == [] + + +class TestImportlibDynamicChainEvasion: + """``importlib.import_module('mod').attr(...)`` is a dynamic-import sink chain. + + It mirrors ``__import__('mod')`` but lets the dangerous module name live in a + string literal so it never appears as a static ``import``. The chain is resolved + to the canonical dotted sink (``os.system``/``subprocess.run``) so it re-enters + the existing ``os.``/``subprocess.`` sink ladders. + """ + + def test_importlib_import_module_os_system(self): + """``importlib.import_module('os').system(...)`` must raise AST5.""" + findings = _run("import importlib\nimportlib.import_module('os').system('id')\n") + assert any(f.rule_id == "AST5" for f in findings) + + def test_importlib_import_module_subprocess_run(self): + """``importlib.import_module('subprocess').run(...)`` must raise AST4.""" + findings = _run("import importlib\nimportlib.import_module('subprocess').run(['id'])\n") + assert any(f.rule_id == "AST4" for f in findings) + + def test_from_importlib_import_module_os_system(self): + """Bare-imported ``import_module('os').system(...)`` must raise AST5.""" + findings = _run("from importlib import import_module\nimport_module('os').system('id')\n") + assert any(f.rule_id == "AST5" for f in findings) + + def test_importlib_import_module_benign_no_false_positive(self): + """A benign dynamic import (``json.loads``) must not match a sink ladder.""" + findings = _run("import importlib\nimportlib.import_module('json').loads('{}')\n") + assert findings == [] diff --git a/tests/nodes/analyzers/test_behavioral_taint_tracking.py b/tests/nodes/analyzers/test_behavioral_taint_tracking.py index 791ca90..022546c 100644 --- a/tests/nodes/analyzers/test_behavioral_taint_tracking.py +++ b/tests/nodes/analyzers/test_behavioral_taint_tracking.py @@ -464,3 +464,62 @@ def test_untyped_variable_no_false_positive(self): ) findings = _run(code) assert not any(f.rule_id == "TT4" for f in findings) + + +# ── builtins / importlib exec-sink evasion ──────────────────────────── + + +class TestBuiltinsImportlibSinkEvasion: + """Exec sinks reached via ``builtins.*`` or ``importlib.import_module`` must alert. + + ``_EXEC_SINKS`` matches by bare/qualified name (``"exec"``, ``"os.system"``). + ``from builtins import exec`` resolves to ``builtins.exec`` (collapsed back to + ``exec``) and ``importlib.import_module('subprocess').run`` resolves to the + canonical ``subprocess.run`` — both must re-enter the exec-sink path so a + user-input → exec flow is flagged as TT5. Complements the ``getattr`` branch + (PR #166): this covers the import/builtins/importlib branch. + """ + + def test_from_builtins_import_exec_sink(self): + """``from builtins import exec`` with tainted input must raise TT5.""" + code = "from builtins import exec\ncode = input()\nexec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_import_builtins_dot_exec_sink(self): + """``import builtins; builtins.exec(input())`` must raise TT5.""" + code = "import builtins\ncode = input()\nbuiltins.exec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_import_builtins_as_alias_sink(self): + """``import builtins as b2; b2.exec(input())`` must raise TT5.""" + code = "import builtins as b2\ncode = input()\nb2.exec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_import_module_os_system_sink(self): + """``importlib.import_module('os').system(input())`` must raise TT5.""" + code = "import importlib\ncmd = input()\nimportlib.import_module('os').system(cmd)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_import_module_subprocess_run_sink(self): + """``importlib.import_module('subprocess').run(input())`` must raise TT5.""" + code = "import importlib\ncmd = input()\nimportlib.import_module('subprocess').run(cmd)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_from_importlib_import_module_sink(self): + """Bare-imported ``import_module('os').system(input())`` must raise TT5.""" + code = ( + "from importlib import import_module\ncmd = input()\nimport_module('os').system(cmd)\n" + ) + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_benign_module_no_false_positive(self): + """A benign dynamic import (``json.loads``) must not be treated as an exec sink.""" + code = "import importlib\ndata = input()\nimportlib.import_module('json').loads(data)\n" + findings = _run(code) + assert not any(f.rule_id == "TT5" for f in findings)