From 9a5fc78481c9958a1d3afece1c6ba759424972a2 Mon Sep 17 00:00:00 2001 From: jzh18 Date: Fri, 12 Jun 2026 03:35:01 +0000 Subject: [PATCH 1/3] Add ruff configuration, dev extra, editorconfig, pre-commit ruff (pinned) serves as both linter (pycodestyle, pyflakes, isort, bugbear) and formatter, scoped to src/rtrace and tests; submodules and the C++ client are untouched. The dev extra installs the tools: pip install -e '.[dev]'. --- .editorconfig | 22 ++++++++++++++++++++++ .pre-commit-config.yaml | 8 ++++++++ pyproject.toml | 20 ++++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 .editorconfig create mode 100644 .pre-commit-config.yaml diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..9e7d34e --- /dev/null +++ b/.editorconfig @@ -0,0 +1,22 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.py] +indent_style = space +indent_size = 4 + +[*.{yml,yaml,toml}] +indent_style = space +indent_size = 2 + +[*.{cc,h}] +indent_style = space +indent_size = 4 + +[Makefile] +indent_style = tab diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..d8e5a57 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,8 @@ +# Install hooks with: pip install pre-commit && pre-commit install +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.15.17 + hooks: + - id: ruff-check + args: [--fix] + - id: ruff-format diff --git a/pyproject.toml b/pyproject.toml index 239783b..72decf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,11 @@ heavy = [ "pycparser==2.22", "capstone==5.0.0.post1", ] +# Development tools (lint, format, tests). +dev = [ + "ruff==0.15.17", + "pytest", +] [project.scripts] rtrace = "rtrace.main:main" @@ -41,3 +46,18 @@ package-dir = {"" = "src"} [tool.setuptools.packages.find] where = ["src"] include = ["rtrace*"] + +[tool.ruff] +target-version = "py39" +line-length = 100 +# Only the Python package and tests; native code and submodules are out of scope. +include = ["src/rtrace/**/*.py", "tests/**/*.py", "pyproject.toml"] + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort (import sorting) + "B", # flake8-bugbear +] From c0e0c9deebab1a3f7476e2aaf4401927d94aa3c4 Mon Sep 17 00:00:00 2001 From: jzh18 Date: Fri, 12 Jun 2026 03:38:05 +0000 Subject: [PATCH 2/3] Apply ruff format and autofixes across the package Mechanical commit: ruff format + ruff check --fix, plus manual wrapping of the 11 over-long strings/comments the formatter cannot split (fixing typos in those comments while rewrapping). No functional changes; review with ruff format --check rather than line-by-line. --- src/rtrace/boundary_detection.py | 1 + src/rtrace/disassembler.py | 3 +- src/rtrace/edition.py | 1 + src/rtrace/function_call.py | 86 ++++++++---- src/rtrace/library.py | 219 +++++++++++++++++-------------- src/rtrace/main.py | 27 ++-- src/rtrace/paths.py | 1 + src/rtrace/postprocess.py | 154 ++++++++++++++-------- src/rtrace/preprocess.py | 17 ++- src/rtrace/process.py | 82 +++++++++--- src/rtrace/utils.py | 2 +- 11 files changed, 375 insertions(+), 218 deletions(-) diff --git a/src/rtrace/boundary_detection.py b/src/rtrace/boundary_detection.py index 9971058..4c1b19e 100644 --- a/src/rtrace/boundary_detection.py +++ b/src/rtrace/boundary_detection.py @@ -52,6 +52,7 @@ def boundary_detection_nucleus(so_path): # nucleus is a native module; import lazily so callers that never reach the # nucleus path do not require it at import time. import nucleus + context = nucleus.load(so_path, binary_base=0x0) entry_addrs = [] for function in context.cfg.functions: diff --git a/src/rtrace/disassembler.py b/src/rtrace/disassembler.py index 35473c9..9654ab4 100644 --- a/src/rtrace/disassembler.py +++ b/src/rtrace/disassembler.py @@ -11,7 +11,8 @@ def _get_disassembler(): global _DISASSEMBLER if _DISASSEMBLER is None: - from capstone import Cs, CS_ARCH_X86, CS_MODE_64 + from capstone import CS_ARCH_X86, CS_MODE_64, Cs + disassembler = Cs(CS_ARCH_X86, CS_MODE_64) disassembler.detail = True disassembler.skipdata = True diff --git a/src/rtrace/edition.py b/src/rtrace/edition.py index 434c576..7ada43e 100644 --- a/src/rtrace/edition.py +++ b/src/rtrace/edition.py @@ -10,6 +10,7 @@ which is installed only in the heavy edition. ``find_spec`` checks availability without paying the cost of importing ``angr``. """ + import importlib.util import sys diff --git a/src/rtrace/function_call.py b/src/rtrace/function_call.py index 9dd86d4..cc5273d 100644 --- a/src/rtrace/function_call.py +++ b/src/rtrace/function_call.py @@ -38,15 +38,17 @@ def __init__(self, pid, tids, log_dir): self.log_dir = log_dir self.block_info = {} for tid in tids: - with open(f'{log_dir}/rtrace-intermediate-{pid}-{tid}-block_info.log', 'r') as f: + with open(f"{log_dir}/rtrace-intermediate-{pid}-{tid}-block_info.log", "r") as f: for line in f: parts = line.split(":") assert len(parts) == 2, f"Invalid block info line: {line}" addr = int(parts[0].strip()) num_insts = int(parts[1].strip()) if addr in self.block_info: - assert self.block_info[ - addr] == num_insts, f"Duplicate block address {addr} with different instruction counts: {self.block_info[addr]} vs {num_insts}" + assert self.block_info[addr] == num_insts, ( + f"Duplicate block address {addr} with different instruction " + f"counts: {self.block_info[addr]} vs {num_insts}" + ) else: self.block_info[addr] = num_insts @@ -56,30 +58,47 @@ def get_block_size(self, abs_addr): class CallLogProcessor(object): - def __init__(self, process_memory: ProcessMemory, block_info: BlockInfo, pid, tid, log_dir, so_names=None): + def __init__( + self, process_memory: ProcessMemory, block_info: BlockInfo, pid, tid, log_dir, so_names=None + ): self.process_memory = process_memory self.log_path = log_dir self.raw_logs = [] self.abs_addr_to_func = {} - self.root_call = Call("root", 0, 0, "root", 0,) + self.root_call = Call( + "root", + 0, + 0, + "root", + 0, + ) self.block_info = block_info.block_info - with open(f'{log_dir}/rtrace-intermediate-{pid}-{tid}-func_args_ret.log', 'r') as f: + with open(f"{log_dir}/rtrace-intermediate-{pid}-{tid}-func_args_ret.log", "r") as f: if so_names is None: for line in f: self.raw_logs.append(line.strip()) else: - so_names=so_names.split(",") - so_names= set([so_name.strip() for so_name in so_names]) + so_names = so_names.split(",") + so_names = set([so_name.strip() for so_name in so_names]) for line in f: if self.is_entry(line) or self.is_exit(line): - addr = self.get_entry_address(line) if self.is_entry(line) else self.get_exit_address(line) + addr = ( + self.get_entry_address(line) + if self.is_entry(line) + else self.get_exit_address(line) + ) module = self.process_memory.get_module_at_address(addr) if module is not None: for so_name in so_names: if so_name in module.path: self.raw_logs.append(line.strip()) break - print(len(self.raw_logs), "function call logs loaded from", f'{log_dir}/rtrace-intermediate-{pid}-{tid}-func_args_ret.log') + print( + len(self.raw_logs), + "function call logs loaded from", + f"{log_dir}/rtrace-intermediate-{pid}-{tid}-func_args_ret.log", + ) + def _create_call(self, abs_address): func = None if abs_address in self.abs_addr_to_func: @@ -112,10 +131,16 @@ def process_logs(self): elif CallLogProcessor.is_block(log): total_blocks += 1 addr = CallLogProcessor.get_block(log) - if addr not in self.block_info or \ - self.process_memory.get_module_at_address(addr) is None or \ - self.process_memory.get_module_at_address(addr).get_function_at_address(addr) is None or \ - self.process_memory.get_module_at_address(addr).get_function_at_address(addr).start != stack[-1].relative_addr: # the block does not belong to the current function + if ( + addr not in self.block_info + or self.process_memory.get_module_at_address(addr) is None + or self.process_memory.get_module_at_address(addr).get_function_at_address(addr) + is None + or self.process_memory.get_module_at_address(addr) + .get_function_at_address(addr) + .start + != stack[-1].relative_addr + ): # the block does not belong to the current function # this might due to exception handling unmatch_func_block += 1 else: # only count the blocks belong to the current function @@ -128,10 +153,13 @@ def process_logs(self): addr = CallLogProcessor.get_exit_address(log) call = stack.pop() if call.abs_addr != addr: - # should not happend but it happens with some exit:0, might due to exception handling + # should not happen but it does with some exit:0, + # might be due to exception handling unmatch_entry_exit += 1 print( - f"Unmatched entry/exit: {unmatch_entry_exit}/{total_calls}; final stack depth: {len(stack)}") + f"Unmatched entry/exit: {unmatch_entry_exit}/{total_calls}; " + f"final stack depth: {len(stack)}" + ) print(f"Unmatched function block: {unmatch_func_block}/{total_blocks}") @@ -148,22 +176,24 @@ def serialize_call(cur_call): "ret_val": cur_call.ret_val, "executed_blocks": cur_call.executed_blocks, "executed_insts": cur_call.executed_insts, - "calls": [serialize_call(c) for c in cur_call.calls] + "calls": [serialize_call(c) for c in cur_call.calls], } - overview.append({ - "so_path": cur_call.so_path, - "name": cur_call.name, - "start_addr": hex(cur_call.relative_addr), - "num_calls": len(cur_call.calls), - "executed_blocks": cur_call.executed_blocks, - "executed_insts": cur_call.executed_insts - }) + overview.append( + { + "so_path": cur_call.so_path, + "name": cur_call.name, + "start_addr": hex(cur_call.relative_addr), + "num_calls": len(cur_call.calls), + "executed_blocks": cur_call.executed_blocks, + "executed_insts": cur_call.executed_insts, + } + ) return call_json + call_json = serialize_call(self.root_call) - with open(output_path, 'w') as f: + with open(output_path, "w") as f: json.dump(call_json, f, indent=4) - pd.DataFrame(overview).to_csv( - output_path.replace('.json', '.csv'), index=False) + pd.DataFrame(overview).to_csv(output_path.replace(".json", ".csv"), index=False) @staticmethod def is_entry(line): diff --git a/src/rtrace/library.py b/src/rtrace/library.py index 5a19923..47769a0 100644 --- a/src/rtrace/library.py +++ b/src/rtrace/library.py @@ -1,6 +1,6 @@ +import json import os import struct -import json from elftools.elf.elffile import ELFFile, SymbolTableSection @@ -12,17 +12,21 @@ try: from capstone import CS_GRP_CALL, CS_GRP_JUMP, CS_GRP_RET from capstone.x86_const import ( - X86_INS_ENDBR64, X86_INS_ENDBR32, X86_INS_NOP, - X86_OP_MEM, X86_OP_REG, + X86_INS_ENDBR32, + X86_INS_ENDBR64, + X86_INS_NOP, + X86_OP_MEM, + X86_OP_REG, ) except ImportError: pass -from .disassembler import disassemble_data from .boundary_detection import ( - boundary_detection_funseeker, boundary_detection_linear, + boundary_detection_funseeker, + boundary_detection_linear, boundary_detection_nucleus, ) +from .disassembler import disassemble_data from .utils import is_func_symbol @@ -36,7 +40,10 @@ def __init__(self, insn, section_name=None, next=None, so_path=None): self.so_path = so_path # path to the shared object file, if applicable def __repr__(self): - return f"{self.so_path}:{self.section_name}:{hex(self.address)} {self.insn.mnemonic} {self.insn.op_str}" + return ( + f"{self.so_path}:{self.section_name}:{hex(self.address)} " + f"{self.insn.mnemonic} {self.insn.op_str}" + ) def is_endbr(self): return self.insn.id in (X86_INS_ENDBR64, X86_INS_ENDBR32) @@ -81,8 +88,9 @@ def is_potential_indirect_return_endbr(self): return False def get_potential_leading_call(self): - assert self.is_potential_indirect_return_endbr( - ), "Instruction is not a potential indirect return endbr" + assert self.is_potential_indirect_return_endbr(), ( + "Instruction is not a potential indirect return endbr" + ) # call/jmp, then endbr if self.prev.is_jmp() or self.prev.is_call(): @@ -103,32 +111,40 @@ def __init__(self, start, end, name, so_path): class Library(object): - INIT_FINI_SEC_NAMES = ['.init_array', '.fini_array'] - - def __init__(self, so_path, analyze_function_prototypes=False, func_info_dir=None, boundary_detection_method=None, debug_sym_file=None): + INIT_FINI_SEC_NAMES = [".init_array", ".fini_array"] + + def __init__( + self, + so_path, + analyze_function_prototypes=False, + func_info_dir=None, + boundary_detection_method=None, + debug_sym_file=None, + ): if func_info_dir is None: func_info_dir = str(paths.cache_dir()) self.so_path = so_path # pyelftools reads sections lazily, so the underlying file must stay # open for the lifetime of the Library; call close() when done. - self._file = open(so_path, 'rb') + self._file = open(so_path, "rb") self._elffile = ELFFile(self._file) self._instructions = [] self._addr_to_instruction = {} self._functions = [] self.boundary_detection_method = boundary_detection_method self.debug_sym_file = debug_sym_file - # this is different from function _get_function_ind_at_address, this maps exact start address to function object + # unlike _get_function_ind_at_address (range lookup), this maps the + # exact start address to the function object self._addr_to_function = {} - self.function_info_path = f'{func_info_dir}/{os.path.basename(so_path)}.info' + self.function_info_path = f"{func_info_dir}/{os.path.basename(so_path)}.info" if os.path.exists(self.function_info_path): - with open(self.function_info_path, 'r') as f: + with open(self.function_info_path, "r") as f: funcs = json.load(f) for f in funcs: - func = Function(f['start'], f['end'], f['name'], self.so_path) - func.num_args = f.get('num_args', 0) - func.args_size = f.get('args_size', []) - func.ret_size = f.get('ret_size', 0) + func = Function(f["start"], f["end"], f["name"], self.so_path) + func.num_args = f.get("num_args", 0) + func.args_size = f.get("args_size", []) + func.ret_size = f.get("ret_size", 0) self._functions.append(func) else: self._create_functions() @@ -138,17 +154,19 @@ def __init__(self, so_path, analyze_function_prototypes=False, func_info_dir=Non if not os.path.exists(func_info_dir): os.makedirs(func_info_dir) if not os.path.exists(self.function_info_path): - with open(self.function_info_path, 'w') as output_file: + with open(self.function_info_path, "w") as output_file: function_json_data = [] for f in self._functions: - function_json_data.append({ - 'start': f.start, - 'end': f.end, - 'name': f.name, - 'num_args': f.num_args, - 'args_size': f.args_size, - 'ret_size': f.ret_size - }) + function_json_data.append( + { + "start": f.start, + "end": f.end, + "name": f.name, + "num_args": f.num_args, + "args_size": f.args_size, + "ret_size": f.ret_size, + } + ) json.dump(function_json_data, output_file, indent=4) self._functions.sort(key=lambda f: f.start) @@ -159,22 +177,23 @@ def close(self): def _list_executable_sections(self): sections = [] for section in self._elffile.iter_sections(): - if section['sh_flags'] & 0x4: + if section["sh_flags"] & 0x4: sections.append(section.name) return sections def _has_symtab(self): - return self._elffile.get_section_by_name('.symtab') is not None - + return self._elffile.get_section_by_name(".symtab") is not None + def _cet_enabled(self): # Check if IBT is enabled by looking for .note.gnu.property section - note_section = self._elffile.get_section_by_name('.note.gnu.property') + note_section = self._elffile.get_section_by_name(".note.gnu.property") if note_section is None: return False for note in note_section.iter_notes(): - if note['n_desc'][0]['pr_data'] == 3: + if note["n_desc"][0]["pr_data"] == 3: return True return False + def _function_boundary_detection(self): # If method is specified, use it # if not specified, use linear if symtab available, otherwise funseeker detection @@ -183,29 +202,30 @@ def _function_boundary_detection(self): print(f"Using linear boundary detection for {self.so_path}") entry_addrs = boundary_detection_linear(self._elffile) elif self._cet_enabled(): - print( - f"Using Funseeker for function boundary detection: {self.so_path}") + print(f"Using Funseeker for function boundary detection: {self.so_path}") entry_addrs = boundary_detection_funseeker(self.so_path) else: print(f"Using Nucleus for function boundary detection: {self.so_path}") entry_addrs = boundary_detection_nucleus(self.so_path) - elif self.boundary_detection_method == 'linear': + elif self.boundary_detection_method == "linear": if self.debug_sym_file is not None: print(f"Using linear boundary detection for {self.so_path}, {self.debug_sym_file}") - with open(self.debug_sym_file, 'rb') as f: + with open(self.debug_sym_file, "rb") as f: entry_addrs = boundary_detection_linear(ELFFile(f)) else: print(f"Using linear boundary detection for {self.so_path}") entry_addrs = boundary_detection_linear(self._elffile) print(len(entry_addrs), "functions detected") - elif self.boundary_detection_method == 'funseeker': + elif self.boundary_detection_method == "funseeker": print(f"Using Funseeker for function boundary detection: {self.so_path}") entry_addrs = boundary_detection_funseeker(self.so_path) - elif self.boundary_detection_method == 'nucleus': + elif self.boundary_detection_method == "nucleus": print(f"Using Nucleus for function boundary detection: {self.so_path}") entry_addrs = boundary_detection_nucleus(self.so_path) else: - raise ValueError(f"Unknown method for boundary detection: {self.boundary_detection_method}") + raise ValueError( + f"Unknown method for boundary detection: {self.boundary_detection_method}" + ) entry_addrs = sorted(set(entry_addrs)) # remove duplicates and sort return entry_addrs @@ -217,43 +237,43 @@ def _read_init_fini_array(self): continue data = section.data() addr_size = 8 if self._elffile.elfclass == 64 else 4 - fmt = 'Q' # Q = uint64 + fmt = "Q" # Q = uint64 if self._elffile.elfclass == 32: - fmt = 'I' # I = uint32 + fmt = "I" # I = uint32 for i in range(0, len(data), addr_size): - ptr_bytes = data[i:i+addr_size] + ptr_bytes = data[i : i + addr_size] ptr = struct.unpack(fmt, ptr_bytes)[0] pointers.append(ptr) return pointers def _get_symbols(self): func_start_to_name = {} + def set_symbols(sec): if not isinstance(sec, SymbolTableSection): return for symbol in sec.iter_symbols(): - start_addr = symbol['st_value'] - if not is_func_symbol(symbol.entry['st_info']['type']): + start_addr = symbol["st_value"] + if not is_func_symbol(symbol.entry["st_info"]["type"]): continue - if symbol.entry['st_info']['type'] != 'STT_FUNC': + if symbol.entry["st_info"]["type"] != "STT_FUNC": continue if start_addr not in func_start_to_name: func_start_to_name[start_addr] = [] func_start_to_name[start_addr].append(symbol.name) if self.debug_sym_file is not None: - with open(self.debug_sym_file, 'rb') as f: - symtab = ELFFile(f).get_section_by_name('.symtab') + with open(self.debug_sym_file, "rb") as f: + symtab = ELFFile(f).get_section_by_name(".symtab") set_symbols(symtab) return func_start_to_name else: if self._has_symtab(): - symtab = self._elffile.get_section_by_name('.symtab') + symtab = self._elffile.get_section_by_name(".symtab") set_symbols(symtab) - dynsymtab = self._elffile.get_section_by_name('.dynsym') + dynsymtab = self._elffile.get_section_by_name(".dynsym") set_symbols(dynsymtab) return func_start_to_name - def _set_func_names(self): func_start_to_name = self._get_symbols() @@ -266,6 +286,7 @@ def _set_func_names(self): def _set_function_prototype(self): # angr is a heavy-edition (mode 0) dependency; import lazily. import angr + print(f"Analyzing function prototypes in {self.so_path}") project = angr.Project(self.so_path, auto_load_libs=False) base_addr = project.loader.main_object.min_addr @@ -292,8 +313,8 @@ def _set_function_prototype(self): def _create_functions(self): # add detected functions analyzed_addrs = set() - text_start_addr = self._elffile.get_section_by_name('.text')['sh_addr'] - text_end_addr = text_start_addr + self._elffile.get_section_by_name('.text')['sh_size'] + text_start_addr = self._elffile.get_section_by_name(".text")["sh_addr"] + text_end_addr = text_start_addr + self._elffile.get_section_by_name(".text")["sh_size"] entry_addrs = self._function_boundary_detection() # remove address outside .text section entry_addrs = [addr for addr in entry_addrs if text_start_addr <= addr <= text_end_addr] @@ -304,35 +325,37 @@ def _create_functions(self): entry_addrs.extend(init_fini_pointers) entry_addrs = sorted(set(entry_addrs)) # remove duplicates and sort if not entry_addrs: - raise ValueError( - f"No function entry addresses detected in {self.so_path}") + raise ValueError(f"No function entry addresses detected in {self.so_path}") for i in range(1, len(entry_addrs)): - start = entry_addrs[i-1] + start = entry_addrs[i - 1] end = entry_addrs[i] if start in analyzed_addrs: continue analyzed_addrs.add(start) self._functions.append( - Function(start, end, f"boundary_detected_{hex(start)}", self.so_path)) + Function(start, end, f"boundary_detected_{hex(start)}", self.so_path) + ) self._functions.append( - Function(entry_addrs[-1], text_end_addr, - f"boundary_detected_{hex(entry_addrs[-1])}", self.so_path) + Function( + entry_addrs[-1], + text_end_addr, + f"boundary_detected_{hex(entry_addrs[-1])}", + self.so_path, + ) ) # add init/fini functions - init_section = self._elffile.get_section_by_name('.init') + init_section = self._elffile.get_section_by_name(".init") if init_section: - init_start = init_section['sh_addr'] - init_end = init_start + init_section['sh_size'] - self._functions.append( - Function(init_start, init_end, ".init", self.so_path)) - fini_section = self._elffile.get_section_by_name('.fini') + init_start = init_section["sh_addr"] + init_end = init_start + init_section["sh_size"] + self._functions.append(Function(init_start, init_end, ".init", self.so_path)) + fini_section = self._elffile.get_section_by_name(".fini") if fini_section: - fini_start = fini_section['sh_addr'] - fini_end = fini_start + fini_section['sh_size'] - self._functions.append( - Function(fini_start, fini_end, ".fini", self.so_path)) + fini_start = fini_section["sh_addr"] + fini_end = fini_start + fini_section["sh_size"] + self._functions.append(Function(fini_start, fini_end, ".fini", self.so_path)) # sort by start address self._functions.sort(key=lambda f: f.start) @@ -342,15 +365,12 @@ def _create_functions(self): def decode(self): executable_sections = self._list_executable_sections() for section_name in executable_sections: - section_data = self._elffile.get_section_by_name( - section_name).data() - section_base_address = self._elffile.get_section_by_name(section_name)[ - 'sh_addr'] + section_data = self._elffile.get_section_by_name(section_name).data() + section_base_address = self._elffile.get_section_by_name(section_name)["sh_addr"] instructions = disassemble_data(section_data, section_base_address) prev_insn = None for insn in instructions: - instruction = Instruction( - insn, section_name, so_path=self.so_path) + instruction = Instruction(insn, section_name, so_path=self.so_path) instruction.prev = prev_insn self._instructions.append(instruction) self._addr_to_instruction[insn.address] = instruction @@ -360,42 +380,45 @@ def decode(self): def dump(self, output_file=None): if output_file is None: - output_file = os.path.basename(self.so_path) + '.disasm' - with open(output_file, 'w') as f: + output_file = os.path.basename(self.so_path) + ".disasm" + with open(output_file, "w") as f: executable_sections = self._list_executable_sections() for section_name in executable_sections: f.write(f"Section: {section_name}\n") for insn in self._instructions: f.write( - f"{insn.address:#x} {insn.insn.mnemonic} {insn.insn.op_str} {insn.section_name}\n") + f"{insn.address:#x} {insn.insn.mnemonic} " + f"{insn.insn.op_str} {insn.section_name}\n" + ) def get_instruction_at_address(self, address): if address in self._addr_to_instruction: return self._addr_to_instruction[address] else: print( - f"Warning: Address not found in cached instructions, disassembling on-the-fly: {address:#x}.") + f"Warning: Address not found in cached instructions, " + f"disassembling on-the-fly: {address:#x}." + ) # find which section the address belongs to for section_name in self._list_executable_sections(): section = self._elffile.get_section_by_name(section_name) - section_base_address = section['sh_addr'] - section_size = section['sh_size'] + section_base_address = section["sh_addr"] + section_size = section["sh_size"] if section_base_address <= address < section_base_address + section_size: section_data = section.data() offset_in_section = address - section_base_address if offset_in_section < len(section_data): insn = disassemble_data( - section_data[offset_in_section:offset_in_section+16], - section_base_address + offset_in_section) + section_data[offset_in_section : offset_in_section + 16], + section_base_address + offset_in_section, + ) if insn: - decoded_insn = Instruction( - insn[0], section_name, so_path=self.so_path) + decoded_insn = Instruction(insn[0], section_name, so_path=self.so_path) self._addr_to_instruction[address] = decoded_insn self._instructions.append(decoded_insn) return decoded_insn - raise ValueError( - f"Cannot find instruction at address {address:#x} in {self.so_path}") + raise ValueError(f"Cannot find instruction at address {address:#x} in {self.so_path}") def _get_function_ind_at_address(self, address): # binary search for the function @@ -403,7 +426,7 @@ def _get_function_ind_at_address(self, address): low, high = 0, len(self._functions) - 1 while low <= high: mid = (low + high) // 2 - func = self._functions[mid] + func = self._functions[mid] if func.start <= address < func.end: return mid elif address < func.start: @@ -426,22 +449,20 @@ def remove_function_at_address(self, address): return False # remove the function if self._functions[index].start != address: - print( - f"Warning: Removing function at address {address:#x} in {self.so_path}") + print(f"Warning: Removing function at address {address:#x} in {self.so_path}") return False if index == 0: self._functions[1].start = self._functions[0].start self._functions.pop(0) else: - self._functions[index-1].end = self._functions[index].end + self._functions[index - 1].end = self._functions[index].end self._functions.pop(index) return True def insert_function_at_address(self, address): # Check if the address is already a function start if self.is_function_start(address): - print( - f"Function already exists at address {address:#x} in {self.so_path}") + print(f"Function already exists at address {address:#x} in {self.so_path}") return False index = self._get_function_ind_at_address(address) @@ -449,18 +470,20 @@ def insert_function_at_address(self, address): if address < self._functions[0].start: # Insert at the beginning end = self._functions[0].start - self._functions.insert(0, Function( - address, end, "post_detected", self.so_path)) + self._functions.insert(0, Function(address, end, "post_detected", self.so_path)) return True else: raise ValueError( - f"Cannot insert function at address {address:#x} in {self.so_path}: no suitable position found.") + f"Cannot insert function at address {address:#x} in " + f"{self.so_path}: no suitable position found." + ) else: # insert within the existing function range inserted_func = Function( - address, self._functions[index].end, "post_detected", self.so_path) + address, self._functions[index].end, "post_detected", self.so_path + ) self._functions[index].end = address - self._functions.insert(index+1, inserted_func) + self._functions.insert(index + 1, inserted_func) return True def is_function_start(self, address): diff --git a/src/rtrace/main.py b/src/rtrace/main.py index a347a6d..8a0ed7a 100755 --- a/src/rtrace/main.py +++ b/src/rtrace/main.py @@ -9,15 +9,16 @@ def main(): parser = argparse.ArgumentParser(prog="rtrace") - parser.add_argument('--logdir', type=str, - help="Directory to store output files") - parser.add_argument('cmd', nargs="*", help="Command to run") - parser.add_argument("--filter", action='store_true') - parser.add_argument("--calllog", action='store_true') - parser.add_argument("--mode", type=int, default=0, choices=[0, 1], - help="0 for rich mode, 1 for light mode") - parser.add_argument("--so_name", type=str, default=None, - help="Shared object name to filter the calllog.") + parser.add_argument("--logdir", type=str, help="Directory to store output files") + parser.add_argument("cmd", nargs="*", help="Command to run") + parser.add_argument("--filter", action="store_true") + parser.add_argument("--calllog", action="store_true") + parser.add_argument( + "--mode", type=int, default=0, choices=[0, 1], help="0 for rich mode, 1 for light mode" + ) + parser.add_argument( + "--so_name", type=str, default=None, help="Shared object name to filter the calllog." + ) args = parser.parse_args() # Light edition supports mode 1 only; fail early with guidance. @@ -30,15 +31,15 @@ def main(): # drrun's install-completeness check warns about the absent lib32/debug # variants on every run otherwise. trace_cmd = ( - f'{paths.drrun()} -quiet -c {paths.librtrace_so()} ' - f'--log_dir {log_dir} --mode {args.mode} -- {cmd}' + f"{paths.drrun()} -quiet -c {paths.librtrace_so()} " + f"--log_dir {log_dir} --mode {args.mode} -- {cmd}" ) retcode = shell_system(trace_cmd) print(f"Trace command executed: {trace_cmd}") post_process_cmd = ( - f'{sys.executable} -m rtrace.postprocess ' - f'--input {log_dir}/ --output {log_dir} --mode {args.mode}' + f"{sys.executable} -m rtrace.postprocess " + f"--input {log_dir}/ --output {log_dir} --mode {args.mode}" ) if args.so_name is not None: post_process_cmd += f" --so_names {args.so_name}" diff --git a/src/rtrace/paths.py b/src/rtrace/paths.py index 3f93e45..00d9941 100644 --- a/src/rtrace/paths.py +++ b/src/rtrace/paths.py @@ -9,6 +9,7 @@ Individual paths can always be overridden by their dedicated environment variable, which takes precedence over both layouts. """ + import os from pathlib import Path diff --git a/src/rtrace/postprocess.py b/src/rtrace/postprocess.py index 2222b06..e2239ab 100755 --- a/src/rtrace/postprocess.py +++ b/src/rtrace/postprocess.py @@ -7,12 +7,21 @@ from .library import Instruction from .process import ProcessMemory - FUNCTION_INFO_DIR = str(paths.cache_dir()) class Node(object): - def __init__(self, address, base, so_name="", section_name="", insn: Instruction = None, is_function_start=False, func_start=None, func_end=None): + def __init__( + self, + address, + base, + so_name="", + section_name="", + insn: Instruction = None, + is_function_start=False, + func_start=None, + func_end=None, + ): self._insn = insn self.so_name = so_name self.section_name = section_name @@ -80,13 +89,17 @@ def is_potential_indirect_return_endbr(self): return indirect_jmp_exist def get_potential_leading_call(self): - assert self.is_potential_indirect_return_endbr( - ), "Node is not a potential indirect return endbr" + assert self.is_potential_indirect_return_endbr(), ( + "Node is not a potential indirect return endbr" + ) insn = self._insn.get_potential_leading_call() return insn def __repr__(self): - return f"{self.so_name}: {hex(self.address)}, {self.is_function_start}, {self.section_name}, {self.base}" + return ( + f"{self.so_name}: {hex(self.address)}, {self.is_function_start}, " + f"{self.section_name}, {self.base}" + ) def __hash__(self): return hash(f"{self.so_name}:{hex(self.address)}") @@ -94,8 +107,7 @@ def __hash__(self): def __eq__(self, other): if not isinstance(other, Node): return False - return (self.so_name == other.so_name and - self.address == other.address) + return self.so_name == other.so_name and self.address == other.address def _create_node_from_address(address, ind, process_memory): @@ -104,15 +116,28 @@ def _create_node_from_address(address, ind, process_memory): node = Node(address=address, base=0, is_function_start=False) else: insn = module.get_instruction_at_address(address) - is_function_start = module.is_function_start( - address, is_relative_addr=False) + is_function_start = module.is_function_start(address, is_relative_addr=False) func = module.get_function_at_address(address) if func is not None: - node = Node(address=insn.address, base=module.start, - so_name=module.path, section_name=insn.section_name, insn=insn, is_function_start=is_function_start, func_start=func.start, func_end=func.end) + node = Node( + address=insn.address, + base=module.start, + so_name=module.path, + section_name=insn.section_name, + insn=insn, + is_function_start=is_function_start, + func_start=func.start, + func_end=func.end, + ) else: - node = Node(address=insn.address, base=module.start, - so_name=module.path, section_name=insn.section_name, insn=insn, is_function_start=is_function_start) + node = Node( + address=insn.address, + base=module.start, + so_name=module.path, + section_name=insn.section_name, + insn=insn, + is_function_start=is_function_start, + ) node.inds.append(ind) return node @@ -142,15 +167,15 @@ def identify_false_positives(address_to_node, branch_taken): for _, node in address_to_node.items(): if node.is_potential_indirect_return_endbr(): for ind in node.inds: - cur_address = node.address+node.base + cur_address = node.address + node.base assert cur_address == branch_taken[ind] # find another node that has the same address before the current one - for j in range(ind-1, -1, -1): + for j in range(ind - 1, -1, -1): if branch_taken[j] == cur_address: - j = j+1 + j = j + 1 break assert j >= 0 - examined_addresses = set(branch_taken[j+1:ind]) + examined_addresses = set(branch_taken[j + 1 : ind]) # get potential leading call address potential_leading_insn = node.get_potential_leading_call() potential_leading_call_addr = potential_leading_insn.address + node.base @@ -159,8 +184,7 @@ def identify_false_positives(address_to_node, branch_taken): if potential_leading_call_addr in examined_addresses: identified_false_positives.add(node) - sorted_false_positives = sorted( - list(identified_false_positives), key=lambda x: x.so_name) + sorted_false_positives = sorted(list(identified_false_positives), key=lambda x: x.so_name) return sorted_false_positives @@ -168,8 +192,12 @@ def identify_false_negatives(address_to_node, branch_taken): fns = set() for i, b in enumerate(branch_taken): node = address_to_node[b] - if node and node.so_name == "/usr/lib/x86_64-linux-gnu/ld-linux-x86-64.so.2" and node.address == 0x56d5: - fb = branch_taken[i+2] + if ( + node + and node.so_name == "/usr/lib/x86_64-linux-gnu/ld-linux-x86-64.so.2" + and node.address == 0x56D5 + ): + fb = branch_taken[i + 2] fn_node = address_to_node[fb] if fn_node.is_in_plt(): continue @@ -191,17 +219,16 @@ def trapped_insns_to_func_coverage_report(trapped_insns, process_memory, output_ so_path = module.path if so_path not in report: report[so_path] = [] - report[so_path].append({ - "function_name": func.name, - "start_offset": func.start - }) + report[so_path].append({"function_name": func.name, "start_offset": func.start}) with open(output_path, "w") as f: json.dump(report, f, indent=4) def remove_duplicate_branch_taken(branch_taken): """ - rtrace can report duplicate addresses, for eaxample, an address is a target but the same time it is also a branch instruction, then it will appear twice in the branch_taken list. We need to remove the consequtive duplicate addresses. + rtrace can report duplicate addresses: for example, when an address is a + branch target and at the same time a branch instruction, it appears twice + in the branch_taken list. Remove such consecutive duplicates. 0x1 jmp 0x2 0x2 jmp 0x3 Then 0x2 will appear twice in the branch_taken list. @@ -210,7 +237,7 @@ def remove_duplicate_branch_taken(branch_taken): return branch_taken new_branch_taken = [branch_taken[0]] for i in range(1, len(branch_taken)): - if branch_taken[i] != branch_taken[i-1]: + if branch_taken[i] != branch_taken[i - 1]: new_branch_taken.append(branch_taken[i]) return new_branch_taken @@ -259,20 +286,38 @@ def get_func_arg_ret(pid, tid, input_dir): if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Postprocess script for rtrace.") - parser.add_argument("--input", type=str, required=True, - help="Input file for postprocessing.") - parser.add_argument("--output", type=str, required=True, - help="Output dir for postprocessing results.") - parser.add_argument("--filter", action='store_true') - parser.add_argument("--calllog", action='store_true') - parser.add_argument("--mode", type=int, default=0, choices=[0, 1, 2], - help="0 for heavy mode, 1 for light mode, 2 for light mode with removal") - parser.add_argument("--bd_algo", type=str, default=None, help="Boundary detaction algorithm, linear or funseeker") - parser.add_argument("--bd_cache_dir",type=str, default=FUNCTION_INFO_DIR, - help="Cache directory for boundary detection") - parser.add_argument("--so_names", type=str, default=None, help="Shared object names to filter the calllog, liba,lib,libc") + parser = argparse.ArgumentParser(description="Postprocess script for rtrace.") + parser.add_argument("--input", type=str, required=True, help="Input file for postprocessing.") + parser.add_argument( + "--output", type=str, required=True, help="Output dir for postprocessing results." + ) + parser.add_argument("--filter", action="store_true") + parser.add_argument("--calllog", action="store_true") + parser.add_argument( + "--mode", + type=int, + default=0, + choices=[0, 1, 2], + help="0 for heavy mode, 1 for light mode, 2 for light mode with removal", + ) + parser.add_argument( + "--bd_algo", + type=str, + default=None, + help="Boundary detaction algorithm, linear or funseeker", + ) + parser.add_argument( + "--bd_cache_dir", + type=str, + default=FUNCTION_INFO_DIR, + help="Cache directory for boundary detection", + ) + parser.add_argument( + "--so_names", + type=str, + default=None, + help="Shared object names to filter the calllog, liba,lib,libc", + ) args = parser.parse_args() input_dir = args.input output_dir = args.output @@ -289,7 +334,15 @@ def get_func_arg_ret(pid, tid, input_dir): module_cache = {} pid_to_tids = get_pid_tid(input_dir) for pid, tids in pid_to_tids.items(): - process_memory = ProcessMemory(pid, tids, input_dir, mode=mode, bd_algo=bd_algo, bd_cache_dir=bd_cache_dir, analyze_function_prototypes=(mode == 0)) + process_memory = ProcessMemory( + pid, + tids, + input_dir, + mode=mode, + bd_algo=bd_algo, + bd_cache_dir=bd_cache_dir, + analyze_function_prototypes=(mode == 0), + ) process_memory_cache[pid] = process_memory for m in process_memory.modules: module_cache[m.path] = m @@ -298,8 +351,7 @@ def get_func_arg_ret(pid, tid, input_dir): print(f"Processing PID: {pid}, TID: {tid}") branch_taken = get_branch_taken(pid, tid, input_dir) branch_taken = remove_duplicate_branch_taken(branch_taken) - entry_node, addr_to_node, edges = create_cfg( - branch_taken, process_memory, tid) + entry_node, addr_to_node, edges = create_cfg(branch_taken, process_memory, tid) fps = identify_false_positives(addr_to_node, branch_taken) all_fps.extend(fps) fns = identify_false_negatives(addr_to_node, branch_taken) @@ -309,22 +361,21 @@ def get_func_arg_ret(pid, tid, input_dir): for node in all_fps: module = module_cache[node.so_name] print(f"remove function {node.so_name}: {hex(node.address)}") - module.remove_function_at_address( - node.address, is_relative_addr=True) + module.remove_function_at_address(node.address, is_relative_addr=True) for node in all_fns: module = module_cache[node.so_name] print(f"Insert function {node.so_name}: {hex(node.address)}") - module.insert_function_at_address(node.address, - is_relative_addr=True) + module.insert_function_at_address(node.address, is_relative_addr=True) if calllog: for pid, tids in pid_to_tids.items(): process_memory = process_memory_cache[pid] block_info = BlockInfo(pid, tids, input_dir) for tid in tids: call_processor = CallLogProcessor( - process_memory, block_info, pid, tid, input_dir, so_names=so_names) + process_memory, block_info, pid, tid, input_dir, so_names=so_names + ) call_processor.process_logs() - output_path = f'{output_dir}/function-calls-{pid}-{tid}.json' + output_path = f"{output_dir}/function-calls-{pid}-{tid}.json" call_processor.dump(output_path) for pid, tids in pid_to_tids.items(): @@ -333,5 +384,4 @@ def get_func_arg_ret(pid, tid, input_dir): print(f"Processing {pid}, {tid}") trapped_insns = get_executed_instrumentations(pid, tid, input_dir) output_file_path = f"{output_dir}/function-executed-{pid}-{tid}.json" - trapped_insns_to_func_coverage_report( - trapped_insns, process_memory, output_file_path) + trapped_insns_to_func_coverage_report(trapped_insns, process_memory, output_file_path) diff --git a/src/rtrace/preprocess.py b/src/rtrace/preprocess.py index 585ed4e..4e446b9 100755 --- a/src/rtrace/preprocess.py +++ b/src/rtrace/preprocess.py @@ -5,17 +5,20 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--so_path", help="Path to the shared object file") + parser.add_argument("--output", help="Path to output file of boundary detection") parser.add_argument( - "--output", help="Path to output file of boundary detection") - parser.add_argument("--method", help="Method to use for boundary detection of stripped binaries", - default="funseeker", choices=["ghidra", "nucleus", "linear", "funseeker", "angr"]) - parser.add_argument("--mode", type=int, default=0, choices=[0, 1], - help="0 for heavy mode, 1 for light mode") + "--method", + help="Method to use for boundary detection of stripped binaries", + default="funseeker", + choices=["ghidra", "nucleus", "linear", "funseeker", "angr"], + ) + parser.add_argument( + "--mode", type=int, default=0, choices=[0, 1], help="0 for heavy mode, 1 for light mode" + ) args = parser.parse_args() so_path = args.so_path output = args.output method = args.method mode = args.mode - Library(so_path, analyze_function_prototypes=( - mode == 0), func_info_dir=output) + Library(so_path, analyze_function_prototypes=(mode == 0), func_info_dir=output) diff --git a/src/rtrace/process.py b/src/rtrace/process.py index da66dfa..dfa49c0 100644 --- a/src/rtrace/process.py +++ b/src/rtrace/process.py @@ -1,20 +1,33 @@ import os + from .library import Library class Module(object): """Module represents a loaded library in the process memory.""" - def __init__(self, path, start, end, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False): + def __init__( + self, + path, + start, + end, + mode=0, + bd_algo=None, + bd_cache_dir=None, + analyze_function_prototypes=False, + ): self.path = path self.start = start self.end = end - self.lib = Library(path, boundary_detection_method=bd_algo, func_info_dir=bd_cache_dir, analyze_function_prototypes=analyze_function_prototypes) - if mode ==0: + self.lib = Library( + path, + boundary_detection_method=bd_algo, + func_info_dir=bd_cache_dir, + analyze_function_prototypes=analyze_function_prototypes, + ) + if mode == 0: self.lib.decode() - - def is_in(self, addr): return self.start <= addr < self.end @@ -54,7 +67,9 @@ def is_function_start(self, address, is_relative_addr=True): return self.lib.is_function_start(addr_in_module) -def get_loaded_module(pid, tids, input_dir, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False): +def get_loaded_module( + pid, tids, input_dir, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False +): # first try to read the corresponding pid-tid file, # if it is empty, try to read another pid-tid' file def read_module_info(file_path): @@ -70,15 +85,28 @@ def read_module_info(file_path): start = int(parts[1].strip()) end = int(parts[2].strip()) if "libtorch_cuda.so" in so_path and bd_algo == "funseeker": - print("Warning: libtorch_cuda.so is skipped for funseeker mode, as it is too large (>=2GB).") + print( + "Warning: libtorch_cuda.so is skipped for funseeker mode, " + "as it is too large (>=2GB)." + ) # skip libtorch_cuda.so continue - modules.append(Module(so_path, start, end, mode=mode, bd_algo=bd_algo, bd_cache_dir=bd_cache_dir, analyze_function_prototypes=analyze_function_prototypes)) + modules.append( + Module( + so_path, + start, + end, + mode=mode, + bd_algo=bd_algo, + bd_cache_dir=bd_cache_dir, + analyze_function_prototypes=analyze_function_prototypes, + ) + ) return modules - + def deduplicate_modules(modules): - module_path_set=set() - dep_modules=[] + module_path_set = set() + dep_modules = [] for m in modules: if m.path in module_path_set: continue @@ -86,15 +114,15 @@ def deduplicate_modules(modules): module_path_set.add(m.path) return dep_modules - all_modules=[] + all_modules = [] for tid in tids: file_path = f"{input_dir}/rtrace-intermediate-{pid}-{tid}-loaded_modules.log" modules = read_module_info(file_path) if modules is not None: all_modules.extend(modules) - if len(all_modules)>0: + if len(all_modules) > 0: return deduplicate_modules(all_modules) - + print(f"Warning: cannot find loaded modules for {pid}-{tids}, trying to read other pids") # cannot find loaded modules for current pid, try with other pids for f in os.listdir(input_dir): @@ -102,18 +130,36 @@ def deduplicate_modules(modules): modules = read_module_info(f"{input_dir}/{f}") if modules is not None: all_modules.extend(modules) - if len(all_modules)>0: + if len(all_modules) > 0: return deduplicate_modules(all_modules) raise ValueError( - f"At least one pid-tid file should exist, but not found for pid: {pid}, tid: {tid}") + f"At least one pid-tid file should exist, but not found for pid: {pid}, tid: {tid}" + ) class ProcessMemory(object): - def __init__(self, pid, tids, log_dir, mode=0, bd_algo=None, bd_cache_dir=None, analyze_function_prototypes=False): + def __init__( + self, + pid, + tids, + log_dir, + mode=0, + bd_algo=None, + bd_cache_dir=None, + analyze_function_prototypes=False, + ): self.pid = pid self.tids = tids self.log_dir = log_dir - self.modules = get_loaded_module(pid, tids, log_dir, mode=mode, bd_algo=bd_algo, bd_cache_dir=bd_cache_dir, analyze_function_prototypes=analyze_function_prototypes) + self.modules = get_loaded_module( + pid, + tids, + log_dir, + mode=mode, + bd_algo=bd_algo, + bd_cache_dir=bd_cache_dir, + analyze_function_prototypes=analyze_function_prototypes, + ) def get_module_at_address(self, address): for module in self.modules: diff --git a/src/rtrace/utils.py b/src/rtrace/utils.py index 3d076f7..fc460f3 100644 --- a/src/rtrace/utils.py +++ b/src/rtrace/utils.py @@ -1,2 +1,2 @@ def is_func_symbol(symbol_type_str): - return symbol_type_str in ["STT_FUNC", "STT_GNU_IFUNC", "STT_LOOS"] \ No newline at end of file + return symbol_type_str in ["STT_FUNC", "STT_GNU_IFUNC", "STT_LOOS"] From 210034858b162d070408d9462eafd62895d6badf Mon Sep 17 00:00:00 2001 From: jzh18 Date: Fri, 12 Jun 2026 03:38:20 +0000 Subject: [PATCH 3/3] Add CI workflow gating lint and formatting on PRs Runs ruff check and ruff format --check on every pull request and push to main. The existing build-bundles workflow (tags/dispatch only) is unchanged. A test job will join this workflow once the pytest suite lands. --- .github/workflows/ci.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8b84c9f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,21 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install ruff + run: pip install "ruff==0.15.17" + - name: Lint + run: ruff check src/ + - name: Check formatting + run: ruff format --check src/