From 556af7751abc3253bcdde5c5807d77a0b637f265 Mon Sep 17 00:00:00 2001 From: jzh18 Date: Fri, 12 Jun 2026 03:46:12 +0000 Subject: [PATCH 1/2] Lift deduplicate_modules to module level It captures no closure state and being nested made it untestable; no behavior change. --- src/rtrace/process.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/rtrace/process.py b/src/rtrace/process.py index dfa49c0..d77a4ce 100644 --- a/src/rtrace/process.py +++ b/src/rtrace/process.py @@ -67,6 +67,18 @@ def is_function_start(self, address, is_relative_addr=True): return self.lib.is_function_start(addr_in_module) +def deduplicate_modules(modules): + """Drop modules with an already-seen path, keeping the first occurrence.""" + module_path_set = set() + dep_modules = [] + for m in modules: + if m.path in module_path_set: + continue + dep_modules.append(m) + module_path_set.add(m.path) + return dep_modules + + def get_loaded_module( pid, tids, input_dir, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False ): @@ -104,16 +116,6 @@ def read_module_info(file_path): ) return modules - def deduplicate_modules(modules): - module_path_set = set() - dep_modules = [] - for m in modules: - if m.path in module_path_set: - continue - dep_modules.append(m) - module_path_set.add(m.path) - return dep_modules - all_modules = [] for tid in tids: file_path = f"{input_dir}/rtrace-intermediate-{pid}-{tid}-loaded_modules.log" From 525c8154d8a6b0c9c150eaedf84988ca8591bfcd Mon Sep 17 00:00:00 2001 From: jzh18 Date: Fri, 12 Jun 2026 03:48:47 +0000 Subject: [PATCH 2/2] Add pytest unit suite and CI test job 47 tests covering the pure logic that previous bugs lived in: - postprocess: remove_duplicate_branch_taken, get_pid_tid - process: deduplicate_modules (regression for the dedup bug) and get_loaded_module with a stubbed Module (multi-tid dedup, empty-file fallback to other pids, malformed-line rejection) - function_call: log-line parsers, Call.__repr__ (regression for the self.abs bug), BlockInfo parsing, and process_logs call-tree construction with stubbed process memory - library: function range lookup, is_function_start, insert (split / before-first / gap) and remove (first / middle / non-start) against a synthetic function list, no ELF fixture required The CI test job fetches only the sr-utils submodule (imported transitively by library.py); the heavy native submodules are not needed for unit tests. --- .github/workflows/ci.yml | 21 ++++- pyproject.toml | 3 + tests/test_function_call.py | 165 ++++++++++++++++++++++++++++++++++++ tests/test_library.py | 97 +++++++++++++++++++++ tests/test_postprocess.py | 50 +++++++++++ tests/test_process.py | 96 +++++++++++++++++++++ 6 files changed, 430 insertions(+), 2 deletions(-) create mode 100644 tests/test_function_call.py create mode 100644 tests/test_library.py create mode 100644 tests/test_postprocess.py create mode 100644 tests/test_process.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b84c9f..50ac936 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,6 +16,23 @@ jobs: - name: Install ruff run: pip install "ruff==0.15.17" - name: Lint - run: ruff check src/ + run: ruff check src/ tests/ - name: Check formatting - run: ruff format --check src/ + run: ruff format --check src/ tests/ + + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + # srutils is imported transitively (library.py -> boundary_detection.py) + # and lives in the sr-utils submodule, not on PyPI. Fetch only that + # submodule; the rest (dynamorio, capstone, ...) are huge native trees. + - name: Fetch sr-utils submodule + run: git submodule update --init submodules/sr-utils + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install package and test dependencies + run: pip install -e . ./submodules/sr-utils pytest + - name: Run tests + run: pytest diff --git a/pyproject.toml b/pyproject.toml index 72decf2..1d927ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,9 @@ package-dir = {"" = "src"} where = ["src"] include = ["rtrace*"] +[tool.pytest.ini_options] +testpaths = ["tests"] + [tool.ruff] target-version = "py39" line-length = 100 diff --git a/tests/test_function_call.py b/tests/test_function_call.py new file mode 100644 index 0000000..dd0ee1a --- /dev/null +++ b/tests/test_function_call.py @@ -0,0 +1,165 @@ +from types import SimpleNamespace + +import pytest + +from rtrace.function_call import BlockInfo, Call, CallLogProcessor + + +class TestCall: + def test_repr(self): + call = Call("foo", 0x1234, 0x34, "/lib/x.so", 2) + + assert repr(call) == "Call(name=foo, abs_addr=4660, so_path=/lib/x.so)" + + +class TestLogLineParsers: + def test_entry(self): + assert CallLogProcessor.is_entry("Entry: 4096") + assert not CallLogProcessor.is_entry("Exit: 4096") + assert CallLogProcessor.get_entry_address("Entry: 4096") == 4096 + + def test_exit(self): + assert CallLogProcessor.is_exit("Exit: 4096") + assert not CallLogProcessor.is_exit("Entry: 4096") + assert CallLogProcessor.get_exit_address("Exit: 4096") == 4096 + + def test_arg(self): + assert CallLogProcessor.is_arg("Arg_0: 7") + assert not CallLogProcessor.is_arg("Ret: 7") + assert CallLogProcessor.get_arg("Arg_1: 7") == 7 + + def test_ret(self): + assert CallLogProcessor.is_ret("Ret: 42") + assert not CallLogProcessor.is_ret("Entry: 42") + assert CallLogProcessor.get_ret("Ret: 42") == 42 + + def test_block(self): + assert CallLogProcessor.is_block("BB: 4096") + assert not CallLogProcessor.is_block("Entry: 4096") + assert CallLogProcessor.get_block("BB: 4096") == 4096 + + +class TestBlockInfo: + def _write_log(self, directory, pid, tid, lines): + path = directory / f"rtrace-intermediate-{pid}-{tid}-block_info.log" + path.write_text("".join(f"{line}\n" for line in lines)) + + def test_parses_blocks(self, tmp_path): + self._write_log(tmp_path, 100, 200, ["4096 : 5", "8192 : 3"]) + + info = BlockInfo(100, [200], str(tmp_path)) + + assert info.get_block_size(4096) == 5 + assert info.get_block_size(8192) == 3 + + def test_duplicate_address_same_count_is_fine(self, tmp_path): + self._write_log(tmp_path, 100, 200, ["4096 : 5", "4096 : 5"]) + + info = BlockInfo(100, [200], str(tmp_path)) + + assert info.get_block_size(4096) == 5 + + def test_duplicate_address_different_count_rejected(self, tmp_path): + self._write_log(tmp_path, 100, 200, ["4096 : 5", "4096 : 6"]) + + with pytest.raises(AssertionError, match="Duplicate block address"): + BlockInfo(100, [200], str(tmp_path)) + + def test_merges_multiple_tids(self, tmp_path): + self._write_log(tmp_path, 100, 200, ["4096 : 5"]) + self._write_log(tmp_path, 100, 201, ["8192 : 3"]) + + info = BlockInfo(100, [200, 201], str(tmp_path)) + + assert info.get_block_size(4096) == 5 + assert info.get_block_size(8192) == 3 + + +class _StubModule: + def __init__(self, start, end, path, functions): + self.start = start + self.end = end + self.path = path + self._functions = functions + + def get_function_at_address(self, address): + for func in self._functions: + if func.start <= address - self.start < func.end: + return func + return None + + +class _StubProcessMemory: + def __init__(self, modules): + self.modules = modules + + def get_module_at_address(self, address): + for module in self.modules: + if module.start <= address < module.end: + return module + return None + + +def make_processor(raw_logs, process_memory, block_info): + """Build a CallLogProcessor without reading log files from disk.""" + processor = object.__new__(CallLogProcessor) + processor.process_memory = process_memory + processor.raw_logs = raw_logs + processor.abs_addr_to_func = {} + processor.root_call = Call("root", 0, 0, "root", 0) + processor.block_info = block_info + return processor + + +class TestProcessLogs: + def test_builds_call_tree_with_args_ret_and_blocks(self): + func = SimpleNamespace(name="foo", start=0x34, end=0x100, so_path="/lib/a.so", num_args=2) + module = _StubModule(start=0x1000, end=0x2000, path="/lib/a.so", functions=[func]) + process_memory = _StubProcessMemory([module]) + raw_logs = [ + "Entry: 4148", # 0x1034 = module base + func start + "Arg_0: 1", + "Arg_1: 2", + "BB: 4148", + "Ret: 42", + "Exit: 4148", + ] + + processor = make_processor(raw_logs, process_memory, block_info={4148: 5}) + processor.process_logs() + + assert len(processor.root_call.calls) == 1 + call = processor.root_call.calls[0] + assert call.name == "foo" + assert call.so_path == "/lib/a.so" + assert call.args == [1, 2] + assert call.ret_val == 42 + assert call.executed_blocks == 1 + assert call.executed_insts == 5 + + def test_nested_calls(self): + outer = SimpleNamespace(name="outer", start=0x34, end=0x40, so_path="/lib/a.so", num_args=0) + inner = SimpleNamespace(name="inner", start=0x40, end=0x50, so_path="/lib/a.so", num_args=0) + module = _StubModule(start=0x1000, end=0x2000, path="/lib/a.so", functions=[outer, inner]) + process_memory = _StubProcessMemory([module]) + raw_logs = [ + "Entry: 4148", # outer + "Entry: 4160", # inner + "Exit: 4160", + "Exit: 4148", + ] + + processor = make_processor(raw_logs, process_memory, block_info={}) + processor.process_logs() + + assert [c.name for c in processor.root_call.calls] == ["outer"] + assert [c.name for c in processor.root_call.calls[0].calls] == ["inner"] + + def test_unknown_address_becomes_unknown_call(self): + process_memory = _StubProcessMemory([]) + raw_logs = ["Entry: 4148", "Exit: 4148"] + + processor = make_processor(raw_logs, process_memory, block_info={}) + processor.process_logs() + + assert processor.root_call.calls[0].name == "unknown" diff --git a/tests/test_library.py b/tests/test_library.py new file mode 100644 index 0000000..d534865 --- /dev/null +++ b/tests/test_library.py @@ -0,0 +1,97 @@ +import pytest + +from rtrace.library import Function, Library + + +def make_library(functions): + """Build a Library around a synthetic function list, skipping ELF parsing.""" + lib = object.__new__(Library) + lib.so_path = "/lib/test.so" + lib._functions = sorted(functions, key=lambda f: f.start) + return lib + + +@pytest.fixture +def library(): + # two adjacent functions, then a gap, then a third + return make_library( + [ + Function(0x100, 0x200, "f1", "/lib/test.so"), + Function(0x200, 0x300, "f2", "/lib/test.so"), + Function(0x400, 0x500, "f3", "/lib/test.so"), + ] + ) + + +class TestGetFunctionAtAddress: + def test_start_address(self, library): + assert library.get_function_at_address(0x100).name == "f1" + + def test_address_within_range(self, library): + assert library.get_function_at_address(0x2FF).name == "f2" + + def test_before_first_function(self, library): + assert library.get_function_at_address(0x50) is None + + def test_in_gap_between_functions(self, library): + assert library.get_function_at_address(0x350) is None + + def test_past_last_function(self, library): + assert library.get_function_at_address(0x500) is None + + +class TestIsFunctionStart: + def test_exact_start(self, library): + assert library.is_function_start(0x200) + + def test_mid_function(self, library): + assert not library.is_function_start(0x250) + + def test_unmapped(self, library): + assert not library.is_function_start(0x50) + + +class TestInsertFunctionAtAddress: + def test_split_existing_function(self, library): + assert library.insert_function_at_address(0x250) + + assert library.get_function_at_address(0x240).name == "f2" + assert library.get_function_at_address(0x240).end == 0x250 + inserted = library.get_function_at_address(0x250) + assert inserted.name == "post_detected" + assert (inserted.start, inserted.end) == (0x250, 0x300) + + def test_existing_start_is_rejected(self, library): + assert not library.insert_function_at_address(0x200) + + def test_insert_before_first_function(self, library): + assert library.insert_function_at_address(0x50) + + inserted = library.get_function_at_address(0x50) + assert (inserted.start, inserted.end) == (0x50, 0x100) + + def test_insert_in_gap_raises(self, library): + with pytest.raises(ValueError, match="no suitable position found"): + library.insert_function_at_address(0x350) + + +class TestRemoveFunctionAtAddress: + def test_remove_middle_function_extends_previous(self, library): + assert library.remove_function_at_address(0x200) + + # f2's range is absorbed by f1 + assert library.get_function_at_address(0x250).name == "f1" + assert library.get_function_at_address(0x250).end == 0x300 + + def test_remove_first_function_extends_next(self, library): + assert library.remove_function_at_address(0x100) + + assert library.get_function_at_address(0x150).name == "f2" + assert library.get_function_at_address(0x150).start == 0x100 + + def test_non_start_address_is_rejected(self, library): + assert not library.remove_function_at_address(0x150) + assert library.get_function_at_address(0x150).name == "f1" + + def test_unmapped_address_is_rejected(self, library): + assert not library.remove_function_at_address(0x350) diff --git a/tests/test_postprocess.py b/tests/test_postprocess.py new file mode 100644 index 0000000..6ee268d --- /dev/null +++ b/tests/test_postprocess.py @@ -0,0 +1,50 @@ +from rtrace.postprocess import get_pid_tid, remove_duplicate_branch_taken + + +class TestRemoveDuplicateBranchTaken: + def test_empty_list(self): + assert remove_duplicate_branch_taken([]) == [] + + def test_single_element(self): + assert remove_duplicate_branch_taken([0x1]) == [0x1] + + def test_no_duplicates(self): + assert remove_duplicate_branch_taken([0x1, 0x2, 0x3]) == [0x1, 0x2, 0x3] + + def test_consecutive_duplicates_removed(self): + assert remove_duplicate_branch_taken([0x1, 0x2, 0x2, 0x3]) == [0x1, 0x2, 0x3] + + def test_runs_collapse_to_one(self): + assert remove_duplicate_branch_taken([0x1, 0x1, 0x1, 0x2, 0x2]) == [0x1, 0x2] + + def test_non_consecutive_duplicates_preserved(self): + assert remove_duplicate_branch_taken([0x1, 0x2, 0x1]) == [0x1, 0x2, 0x1] + + +class TestGetPidTid: + def test_groups_tids_by_pid(self, tmp_path): + for name in [ + "rtrace-intermediate-100-200-branch_taken.log", + "rtrace-intermediate-100-200-loaded_modules.log", + "rtrace-intermediate-100-201-branch_taken.log", + "rtrace-intermediate-101-300-branch_taken.log", + ]: + (tmp_path / name).touch() + + pid_to_tids = get_pid_tid(str(tmp_path)) + + assert sorted(pid_to_tids) == ["100", "101"] + assert sorted(pid_to_tids["100"]) == ["200", "201"] + assert pid_to_tids["101"] == ["300"] + + def test_ignores_unrelated_files(self, tmp_path): + (tmp_path / "function-executed-100-200.json").touch() + (tmp_path / "notes.txt").touch() + + assert get_pid_tid(str(tmp_path)) == {} + + def test_tid_not_duplicated_across_log_kinds(self, tmp_path): + (tmp_path / "rtrace-intermediate-100-200-branch_taken.log").touch() + (tmp_path / "rtrace-intermediate-100-200-func_args_ret.log").touch() + + assert get_pid_tid(str(tmp_path)) == {"100": ["200"]} diff --git a/tests/test_process.py b/tests/test_process.py new file mode 100644 index 0000000..08f593d --- /dev/null +++ b/tests/test_process.py @@ -0,0 +1,96 @@ +from types import SimpleNamespace + +import pytest + +import rtrace.process +from rtrace.process import deduplicate_modules, get_loaded_module + + +class TestDeduplicateModules: + def test_removes_duplicate_paths(self): + modules = [ + SimpleNamespace(path="/lib/a.so"), + SimpleNamespace(path="/lib/b.so"), + SimpleNamespace(path="/lib/a.so"), + ] + + result = deduplicate_modules(modules) + + assert [m.path for m in result] == ["/lib/a.so", "/lib/b.so"] + + def test_keeps_first_occurrence(self): + first = SimpleNamespace(path="/lib/a.so", marker="first") + second = SimpleNamespace(path="/lib/a.so", marker="second") + + result = deduplicate_modules([first, second]) + + assert result == [first] + + def test_empty(self): + assert deduplicate_modules([]) == [] + + +class _StubModule: + """Stands in for Module so tests do not construct a full Library.""" + + def __init__(self, path, start, end, **kwargs): + self.path = path + self.start = start + self.end = end + + +@pytest.fixture +def stub_module(monkeypatch): + monkeypatch.setattr(rtrace.process, "Module", _StubModule) + + +def _write_modules_log(directory, pid, tid, lines): + path = directory / f"rtrace-intermediate-{pid}-{tid}-loaded_modules.log" + path.write_text("".join(f"{line}\n" for line in lines)) + + +class TestGetLoadedModule: + def test_reads_modules_for_pid_tid(self, tmp_path, stub_module): + _write_modules_log( + tmp_path, 100, 200, ["/lib/a.so : 4096 : 8192", "/lib/b.so : 8192 : 12288"] + ) + + modules = get_loaded_module(100, [200], str(tmp_path)) + + assert [(m.path, m.start, m.end) for m in modules] == [ + ("/lib/a.so", 4096, 8192), + ("/lib/b.so", 8192, 12288), + ] + + def test_deduplicates_across_tids(self, tmp_path, stub_module): + _write_modules_log(tmp_path, 100, 200, ["/lib/a.so : 4096 : 8192"]) + _write_modules_log(tmp_path, 100, 201, ["/lib/a.so : 4096 : 8192"]) + + modules = get_loaded_module(100, [200, 201], str(tmp_path)) + + assert len(modules) == 1 + + def test_falls_back_to_other_pids(self, tmp_path, stub_module): + # the requested pid-tid file is empty; another pid has modules + _write_modules_log(tmp_path, 100, 200, []) + _write_modules_log(tmp_path, 999, 888, ["/lib/a.so : 4096 : 8192"]) + + modules = get_loaded_module(100, [200], str(tmp_path)) + + assert [m.path for m in modules] == ["/lib/a.so"] + + def test_raises_when_all_module_logs_are_empty(self, tmp_path, stub_module): + _write_modules_log(tmp_path, 100, 200, []) + + with pytest.raises(ValueError, match="At least one pid-tid file should exist"): + get_loaded_module(100, [200], str(tmp_path)) + + def test_missing_log_file_raises(self, tmp_path, stub_module): + with pytest.raises(FileNotFoundError): + get_loaded_module(100, [200], str(tmp_path)) + + def test_rejects_malformed_line(self, tmp_path, stub_module): + _write_modules_log(tmp_path, 100, 200, ["/lib/a.so : 4096"]) + + with pytest.raises(AssertionError, match="Invalid line format"): + get_loaded_module(100, [200], str(tmp_path))