Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ jobs:
- name: Install ruff
run: pip install "ruff==0.15.17"
- name: Lint
run: ruff check src/
run: ruff check src/ tests/
- name: Check formatting
run: ruff format --check src/
run: ruff format --check src/ tests/

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# srutils is imported transitively (library.py -> boundary_detection.py)
# and lives in the sr-utils submodule, not on PyPI. Fetch only that
# submodule; the rest (dynamorio, capstone, ...) are huge native trees.
- name: Fetch sr-utils submodule
run: git submodule update --init submodules/sr-utils
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install package and test dependencies
run: pip install -e . ./submodules/sr-utils pytest
- name: Run tests
run: pytest
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ package-dir = {"" = "src"}
where = ["src"]
include = ["rtrace*"]

[tool.pytest.ini_options]
testpaths = ["tests"]

[tool.ruff]
target-version = "py39"
line-length = 100
Expand Down
22 changes: 12 additions & 10 deletions src/rtrace/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ def is_function_start(self, address, is_relative_addr=True):
return self.lib.is_function_start(addr_in_module)


def deduplicate_modules(modules):
"""Drop modules with an already-seen path, keeping the first occurrence."""
module_path_set = set()
dep_modules = []
for m in modules:
if m.path in module_path_set:
continue
dep_modules.append(m)
module_path_set.add(m.path)
return dep_modules


def get_loaded_module(
pid, tids, input_dir, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False
):
Expand Down Expand Up @@ -104,16 +116,6 @@ def read_module_info(file_path):
)
return modules

def deduplicate_modules(modules):
module_path_set = set()
dep_modules = []
for m in modules:
if m.path in module_path_set:
continue
dep_modules.append(m)
module_path_set.add(m.path)
return dep_modules

all_modules = []
for tid in tids:
file_path = f"{input_dir}/rtrace-intermediate-{pid}-{tid}-loaded_modules.log"
Expand Down
165 changes: 165 additions & 0 deletions tests/test_function_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from types import SimpleNamespace

import pytest

from rtrace.function_call import BlockInfo, Call, CallLogProcessor


class TestCall:
def test_repr(self):
call = Call("foo", 0x1234, 0x34, "/lib/x.so", 2)

assert repr(call) == "Call(name=foo, abs_addr=4660, so_path=/lib/x.so)"


class TestLogLineParsers:
def test_entry(self):
assert CallLogProcessor.is_entry("Entry: 4096")
assert not CallLogProcessor.is_entry("Exit: 4096")
assert CallLogProcessor.get_entry_address("Entry: 4096") == 4096

def test_exit(self):
assert CallLogProcessor.is_exit("Exit: 4096")
assert not CallLogProcessor.is_exit("Entry: 4096")
assert CallLogProcessor.get_exit_address("Exit: 4096") == 4096

def test_arg(self):
assert CallLogProcessor.is_arg("Arg_0: 7")
assert not CallLogProcessor.is_arg("Ret: 7")
assert CallLogProcessor.get_arg("Arg_1: 7") == 7

def test_ret(self):
assert CallLogProcessor.is_ret("Ret: 42")
assert not CallLogProcessor.is_ret("Entry: 42")
assert CallLogProcessor.get_ret("Ret: 42") == 42

def test_block(self):
assert CallLogProcessor.is_block("BB: 4096")
assert not CallLogProcessor.is_block("Entry: 4096")
assert CallLogProcessor.get_block("BB: 4096") == 4096


class TestBlockInfo:
def _write_log(self, directory, pid, tid, lines):
path = directory / f"rtrace-intermediate-{pid}-{tid}-block_info.log"
path.write_text("".join(f"{line}\n" for line in lines))

def test_parses_blocks(self, tmp_path):
self._write_log(tmp_path, 100, 200, ["4096 : 5", "8192 : 3"])

info = BlockInfo(100, [200], str(tmp_path))

assert info.get_block_size(4096) == 5
assert info.get_block_size(8192) == 3

def test_duplicate_address_same_count_is_fine(self, tmp_path):
self._write_log(tmp_path, 100, 200, ["4096 : 5", "4096 : 5"])

info = BlockInfo(100, [200], str(tmp_path))

assert info.get_block_size(4096) == 5

def test_duplicate_address_different_count_rejected(self, tmp_path):
self._write_log(tmp_path, 100, 200, ["4096 : 5", "4096 : 6"])

with pytest.raises(AssertionError, match="Duplicate block address"):
BlockInfo(100, [200], str(tmp_path))

def test_merges_multiple_tids(self, tmp_path):
self._write_log(tmp_path, 100, 200, ["4096 : 5"])
self._write_log(tmp_path, 100, 201, ["8192 : 3"])

info = BlockInfo(100, [200, 201], str(tmp_path))

assert info.get_block_size(4096) == 5
assert info.get_block_size(8192) == 3


class _StubModule:
def __init__(self, start, end, path, functions):
self.start = start
self.end = end
self.path = path
self._functions = functions

def get_function_at_address(self, address):
for func in self._functions:
if func.start <= address - self.start < func.end:
return func
return None


class _StubProcessMemory:
def __init__(self, modules):
self.modules = modules

def get_module_at_address(self, address):
for module in self.modules:
if module.start <= address < module.end:
return module
return None


def make_processor(raw_logs, process_memory, block_info):
"""Build a CallLogProcessor without reading log files from disk."""
processor = object.__new__(CallLogProcessor)
processor.process_memory = process_memory
processor.raw_logs = raw_logs
processor.abs_addr_to_func = {}
processor.root_call = Call("root", 0, 0, "root", 0)
processor.block_info = block_info
return processor


class TestProcessLogs:
def test_builds_call_tree_with_args_ret_and_blocks(self):
func = SimpleNamespace(name="foo", start=0x34, end=0x100, so_path="/lib/a.so", num_args=2)
module = _StubModule(start=0x1000, end=0x2000, path="/lib/a.so", functions=[func])
process_memory = _StubProcessMemory([module])
raw_logs = [
"Entry: 4148", # 0x1034 = module base + func start
"Arg_0: 1",
"Arg_1: 2",
"BB: 4148",
"Ret: 42",
"Exit: 4148",
]

processor = make_processor(raw_logs, process_memory, block_info={4148: 5})
processor.process_logs()

assert len(processor.root_call.calls) == 1
call = processor.root_call.calls[0]
assert call.name == "foo"
assert call.so_path == "/lib/a.so"
assert call.args == [1, 2]
assert call.ret_val == 42
assert call.executed_blocks == 1
assert call.executed_insts == 5

def test_nested_calls(self):
outer = SimpleNamespace(name="outer", start=0x34, end=0x40, so_path="/lib/a.so", num_args=0)
inner = SimpleNamespace(name="inner", start=0x40, end=0x50, so_path="/lib/a.so", num_args=0)
module = _StubModule(start=0x1000, end=0x2000, path="/lib/a.so", functions=[outer, inner])
process_memory = _StubProcessMemory([module])
raw_logs = [
"Entry: 4148", # outer
"Entry: 4160", # inner
"Exit: 4160",
"Exit: 4148",
]

processor = make_processor(raw_logs, process_memory, block_info={})
processor.process_logs()

assert [c.name for c in processor.root_call.calls] == ["outer"]
assert [c.name for c in processor.root_call.calls[0].calls] == ["inner"]

def test_unknown_address_becomes_unknown_call(self):
process_memory = _StubProcessMemory([])
raw_logs = ["Entry: 4148", "Exit: 4148"]

processor = make_processor(raw_logs, process_memory, block_info={})
processor.process_logs()

assert processor.root_call.calls[0].name == "unknown"
97 changes: 97 additions & 0 deletions tests/test_library.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pytest

from rtrace.library import Function, Library


def make_library(functions):
"""Build a Library around a synthetic function list, skipping ELF parsing."""
lib = object.__new__(Library)
lib.so_path = "/lib/test.so"
lib._functions = sorted(functions, key=lambda f: f.start)
return lib


@pytest.fixture
def library():
# two adjacent functions, then a gap, then a third
return make_library(
[
Function(0x100, 0x200, "f1", "/lib/test.so"),
Function(0x200, 0x300, "f2", "/lib/test.so"),
Function(0x400, 0x500, "f3", "/lib/test.so"),
]
)


class TestGetFunctionAtAddress:
def test_start_address(self, library):
assert library.get_function_at_address(0x100).name == "f1"

def test_address_within_range(self, library):
assert library.get_function_at_address(0x2FF).name == "f2"

def test_before_first_function(self, library):
assert library.get_function_at_address(0x50) is None

def test_in_gap_between_functions(self, library):
assert library.get_function_at_address(0x350) is None

def test_past_last_function(self, library):
assert library.get_function_at_address(0x500) is None


class TestIsFunctionStart:
def test_exact_start(self, library):
assert library.is_function_start(0x200)

def test_mid_function(self, library):
assert not library.is_function_start(0x250)

def test_unmapped(self, library):
assert not library.is_function_start(0x50)


class TestInsertFunctionAtAddress:
def test_split_existing_function(self, library):
assert library.insert_function_at_address(0x250)

assert library.get_function_at_address(0x240).name == "f2"
assert library.get_function_at_address(0x240).end == 0x250
inserted = library.get_function_at_address(0x250)
assert inserted.name == "post_detected"
assert (inserted.start, inserted.end) == (0x250, 0x300)

def test_existing_start_is_rejected(self, library):
assert not library.insert_function_at_address(0x200)

def test_insert_before_first_function(self, library):
assert library.insert_function_at_address(0x50)

inserted = library.get_function_at_address(0x50)
assert (inserted.start, inserted.end) == (0x50, 0x100)

def test_insert_in_gap_raises(self, library):
with pytest.raises(ValueError, match="no suitable position found"):
library.insert_function_at_address(0x350)


class TestRemoveFunctionAtAddress:
def test_remove_middle_function_extends_previous(self, library):
assert library.remove_function_at_address(0x200)

# f2's range is absorbed by f1
assert library.get_function_at_address(0x250).name == "f1"
assert library.get_function_at_address(0x250).end == 0x300

def test_remove_first_function_extends_next(self, library):
assert library.remove_function_at_address(0x100)

assert library.get_function_at_address(0x150).name == "f2"
assert library.get_function_at_address(0x150).start == 0x100

def test_non_start_address_is_rejected(self, library):
assert not library.remove_function_at_address(0x150)
assert library.get_function_at_address(0x150).name == "f1"

def test_unmapped_address_is_rejected(self, library):
assert not library.remove_function_at_address(0x350)
50 changes: 50 additions & 0 deletions tests/test_postprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from rtrace.postprocess import get_pid_tid, remove_duplicate_branch_taken


class TestRemoveDuplicateBranchTaken:
def test_empty_list(self):
assert remove_duplicate_branch_taken([]) == []

def test_single_element(self):
assert remove_duplicate_branch_taken([0x1]) == [0x1]

def test_no_duplicates(self):
assert remove_duplicate_branch_taken([0x1, 0x2, 0x3]) == [0x1, 0x2, 0x3]

def test_consecutive_duplicates_removed(self):
assert remove_duplicate_branch_taken([0x1, 0x2, 0x2, 0x3]) == [0x1, 0x2, 0x3]

def test_runs_collapse_to_one(self):
assert remove_duplicate_branch_taken([0x1, 0x1, 0x1, 0x2, 0x2]) == [0x1, 0x2]

def test_non_consecutive_duplicates_preserved(self):
assert remove_duplicate_branch_taken([0x1, 0x2, 0x1]) == [0x1, 0x2, 0x1]


class TestGetPidTid:
def test_groups_tids_by_pid(self, tmp_path):
for name in [
"rtrace-intermediate-100-200-branch_taken.log",
"rtrace-intermediate-100-200-loaded_modules.log",
"rtrace-intermediate-100-201-branch_taken.log",
"rtrace-intermediate-101-300-branch_taken.log",
]:
(tmp_path / name).touch()

pid_to_tids = get_pid_tid(str(tmp_path))

assert sorted(pid_to_tids) == ["100", "101"]
assert sorted(pid_to_tids["100"]) == ["200", "201"]
assert pid_to_tids["101"] == ["300"]

def test_ignores_unrelated_files(self, tmp_path):
(tmp_path / "function-executed-100-200.json").touch()
(tmp_path / "notes.txt").touch()

assert get_pid_tid(str(tmp_path)) == {}

def test_tid_not_duplicated_across_log_kinds(self, tmp_path):
(tmp_path / "rtrace-intermediate-100-200-branch_taken.log").touch()
(tmp_path / "rtrace-intermediate-100-200-func_args_ret.log").touch()

assert get_pid_tid(str(tmp_path)) == {"100": ["200"]}
Loading
Loading