From 59a874ab22cd59ab020887cd6147acb579d10330 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Sun, 25 Oct 2020 16:11:23 -0700 Subject: [PATCH 1/8] devmem: Use bytes buffer instead of list[int] * Formerly the data buffer was a list of integers. * This seems to more accurately reflect memory and is probably faster and more efficient. * Hopefully makes supporting 64-bit reads/writes easier * Format with `black` again. --- devmem/__init__.py | 151 ++++++++++++++++++++++----------------------- devmem/__main__.py | 13 ++-- 2 files changed, 84 insertions(+), 80 deletions(-) diff --git a/devmem/__init__.py b/devmem/__init__.py index e8c9358..35e9e6c 100644 --- a/devmem/__init__.py +++ b/devmem/__init__.py @@ -18,61 +18,52 @@ import mmap import struct + class DevMemBuffer: """This class holds data for objects returned from DevMem class. It allows an easy way to print hex data""" - def __init__(self, base_addr, data): + lut = {1: "B", 2: "H", 4: "I"} + + def __init__(self, base_addr, data, word=4): self.data = data self.base_addr = base_addr + self.word = word def __len__(self): return len(self.data) - def __getitem__(self, key): - return self.data[key] + def __getitem__(self, i): + start = i * self.word + stop = start + self.word + + if stop > len(self.data): + raise IndexError - def __setitem__(self, key, value): - self.data[key] = value + return self.data[start:stop] - def hexdump(self, word_size = 4, words_per_row = 4): + def __setitem__(self, i, value): + raise NotImplementedError("DevMemBuffer is immutable") + # self.data[i] = value + + def hexdump(self, words_per_row=4): # Build a list of strings and then join them in the last step. # This is more efficient then concat'ing immutable strings. - d = self.data dump = [] + row_step = words_per_row * self.word - word = 0 - while (word < len(d)): - dump.append('0x{0:02x}: '.format(self.base_addr - + word_size * word)) - - max_col = word + words_per_row - if max_col > len(d): max_col = len(d) - - while word < max_col: - # If the word is 4 bytes, then handle it and continue the - # loop, this should be the normal case - if word_size == 4: - dump.append(" {0:08x} ".format(d[word])) + for row_pos in range(0, len(self.data), row_step): + row = ["0x{0:02x}: ".format(self.base_addr + row_pos)] - # Otherwise the word_size is not an int, pack it so it can be - # un-packed to the desired word size. This should blindly - # handle endian problems (Verify?) - elif word_size == 2: - for halfword in struct.unpack('HH', struct.pack('I',(d[word]))): - dump.append(" {0:04x}".format(halfword)) + row_stop = min(row_pos + row_step, len(self.data)) + for col_pos in range(row_pos, row_stop, self.word): + data = struct.unpack_from(self.lut[self.word], self.data, col_pos) + for b in data: + row.append(" {0:0{width}x}".format(b, width=self.word * 2)) - elif word_size == 1: - for byte in struct.unpack('BBBB', struct.pack('I',(d[word]))): - dump.append(" {0:02x}".format(byte)) + dump.append(" ".join(row)) - word += 1 - - dump.append('\n') - - # Chop off the last new line character and join the list of strings - # in to a single string - return ''.join(dump[:-1]) + return "\n".join(dump) def __str__(self): return self.hexdump() @@ -81,40 +72,48 @@ def __str__(self): class DevMem: """Class to read and write data aligned to word boundaries of /dev/mem""" - # Size of a word that will be used for reading/writing - word = 4 - mask = ~(word - 1) f = None - def __init__(self, base_addr, length = 1, filename = '/dev/mem', - debug = 0): + def __init__(self, base_addr, length=1, filename="/dev/mem", debug=0, word=4): - if base_addr < 0 or length < 0: raise AssertionError + if base_addr < 0 or length < 0: + raise AssertionError self._debug = debug + # Size of a word that will be used for reading/writing + self.word = word + self.mask = ~(word - 1) + self.base_addr = base_addr & ~(mmap.PAGESIZE - 1) self.base_addr_offset = base_addr - self.base_addr stop = base_addr + length * self.word - if (stop % self.mask): + if stop % self.mask: stop = (stop + self.word) & ~(self.word - 1) self.length = stop - self.base_addr self.fname = filename # Check filesize (doesn't work with /dev/mem) - #filesize = os.stat(self.fname).st_size - #if (self.base_addr + self.length) > filesize: + # filesize = os.stat(self.fname).st_size + # if (self.base_addr + self.length) > filesize: # self.length = filesize - self.base_addr - self.debug('init with base_addr = {0} and length = {1} on {2}'. - format(hex(self.base_addr), hex(self.length), self.fname)) + self.debug( + "init with base_addr = {0} and length = {1} on {2}".format( + hex(self.base_addr), hex(self.length), self.fname + ) + ) # Open file and mmap self.f = os.open(self.fname, os.O_RDWR | os.O_SYNC) - self.mem = mmap.mmap(self.f, self.length, mmap.MAP_SHARED, - mmap.PROT_READ | mmap.PROT_WRITE, - offset=self.base_addr) + self.mem = mmap.mmap( + self.f, + self.length, + mmap.MAP_SHARED, + mmap.PROT_READ | mmap.PROT_WRITE, + offset=self.base_addr, + ) def __del__(self): if self.f: @@ -123,59 +122,59 @@ def __del__(self): def read(self, offset, length): """Read length number of words from offset""" - if offset < 0 or length < 0: raise AssertionError + if offset < 0 or length < 0: + raise AssertionError - # Make reading easier (and faster... won't resolve dot in loops) - mem = self.mem - - self.debug('reading {0} bytes from offset {1}'. - format(length * self.word, hex(offset))) + self.debug( + "reading {0} bytes from offset {1}".format(length * self.word, hex(offset)) + ) # Compensate for the base_address not being what the user requested # and then seek to the aligned offset. virt_base_addr = self.base_addr_offset & self.mask - mem.seek(virt_base_addr + offset) + self.mem.seek(virt_base_addr + offset) - # Read length words of size self.word and return it - data = [] - for i in range(length): - data.append(struct.unpack('I', mem.read(self.word))[0]) + # Read length words of size self.word and return bytes() + data = self.mem.read(length * self.word) abs_addr = self.base_addr + virt_base_addr - return DevMemBuffer(abs_addr + offset, data) - + return DevMemBuffer(abs_addr + offset, data, self.word) def write(self, offset, din): """Write length number of words to offset""" - if offset < 0 or len(din) <= 0: raise AssertionError - - self.debug('writing {0} bytes to offset {1}'. - format(len(din) * self.word, hex(offset))) + if offset < 0 or len(din) <= 0: + raise AssertionError - # Make reading easier (and faster... won't resolve dot in loops) - mem = self.mem + self.debug( + "writing {0} bytes to offset {1}".format(len(din) * self.word, hex(offset)) + ) # Compensate for the base_address not being what the user requested # fix double plus offset - #offset += self.base_addr_offset + # offset += self.base_addr_offset # Check that the operation is going write to an aligned location - if (offset & ~self.mask): raise AssertionError + if offset & ~self.mask: + raise AssertionError # Seek to the aligned offset virt_base_addr = self.base_addr_offset & self.mask - mem.seek(virt_base_addr + offset) + self.mem.seek(virt_base_addr + offset) # Read until the end of our aligned address for i in range(len(din)): - self.debug('writing at position = {0}: 0x{1:x}'. - format(self.mem.tell(), din[i])) + self.debug( + "writing at position = {0}: 0x{1:x}".format( + self.self.mem.tell(), din[i] + ) + ) # Write one word at a time - mem.write(struct.pack('I', din[i])) + self.mem.write(struct.pack("I", din[i])) def debug_set(self, value): self._debug = value def debug(self, debug_str): - if self._debug: print('DevMem Debug: {0}'.format(debug_str)) + if self._debug: + print("DevMem Debug: {0}".format(debug_str)) diff --git a/devmem/__main__.py b/devmem/__main__.py index 30daf32..3f67946 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -105,14 +105,19 @@ def main() -> int: addr = args.write[0] if args.write else args.read # Create the Dev Mem object that does the magic - mem = DevMem(addr, length=args.num, filename=args.mmap, debug=args.debug) + mem = DevMem( + addr, length=args.num, filename=args.mmap, debug=args.debug, word=args.word_size + ) + + bytes_per_row = 16 + words_per_row = int(bytes_per_row / args.word_size) # Perform the actual read or write if args.write is not None: if args.verbose: print( "Value before write:\t{0}".format( - mem.read(0x0, args.num).hexdump(args.word_size) + mem.read(0x0, args.num).hexdump(words_per_row) ) ) @@ -121,11 +126,11 @@ def main() -> int: if args.verbose: print( "Value after write:\t{0}".format( - mem.read(0x0, args.num).hexdump(args.word_size) + mem.read(0x0, args.num).hexdump(words_per_row) ) ) else: - print(mem.read(0x0, args.num).hexdump(args.word_size)) + print(mem.read(0x0, args.num).hexdump(words_per_row)) if __name__ == "__main__": From 7379dc123a231d2b784de1b1d1d43bf9ff349ea4 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Sun, 25 Oct 2020 17:47:08 -0700 Subject: [PATCH 2/8] devmem: Add binary output option * Make it simple to stream data to another program or write to a file. --- devmem/__main__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/devmem/__main__.py b/devmem/__main__.py index 3f67946..538be5f 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -88,6 +88,13 @@ def main() -> int: "-d", "--debug", action="store_true", help="provide debugging information" ) + parser.add_argument( + "-b", + "--binary", + action="store_true", + help="output data as a binary stream", + ) + args = parser.parse_args() # Check for sane arguments @@ -129,6 +136,11 @@ def main() -> int: mem.read(0x0, args.num).hexdump(words_per_row) ) ) + + elif args.binary: + buf = mem.read(0x0, args.num) + sys.stdout.buffer.write(buf.data) + else: print(mem.read(0x0, args.num).hexdump(words_per_row)) From 59d8569b9fe84d02133e9219412f500a9f6504bd Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 11:16:07 -0700 Subject: [PATCH 3/8] devmem: Write debug info to stderr * Otherwise there's a chance to mix the binary output with the debug output if piped to another program or redirected to a file. --- devmem/__init__.py | 4 +++- devmem/__main__.py | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/devmem/__init__.py b/devmem/__init__.py index 35e9e6c..952e60f 100644 --- a/devmem/__init__.py +++ b/devmem/__init__.py @@ -12,6 +12,8 @@ http://www.python.org/dev/peps/pep-0008/ """ +# TODO delete when dropping Python 2 support +from __future__ import print_function import os import sys @@ -177,4 +179,4 @@ def debug_set(self, value): def debug(self, debug_str): if self._debug: - print("DevMem Debug: {0}".format(debug_str)) + print("DevMem Debug: {0}".format(debug_str), file=sys.stderr) diff --git a/devmem/__main__.py b/devmem/__main__.py index 538be5f..b9a36c5 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +# TODO delete when dropping Python 2 support +from __future__ import print_function + import argparse import os import sys @@ -125,7 +128,8 @@ def main() -> int: print( "Value before write:\t{0}".format( mem.read(0x0, args.num).hexdump(words_per_row) - ) + ), + file=sys.stderr, ) mem.write(0x0, [args.write[1]]) @@ -134,7 +138,8 @@ def main() -> int: print( "Value after write:\t{0}".format( mem.read(0x0, args.num).hexdump(words_per_row) - ) + ), + file=sys.stderr, ) elif args.binary: From 4ab1854f70f87213bc7145ee8c5a8267b852dcf1 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 11:19:14 -0700 Subject: [PATCH 4/8] devmem: main: Drop type hinting * Some embedded platforms are still on Python 2.7 :( --- devmem/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/devmem/__main__.py b/devmem/__main__.py index b9a36c5..aab7784 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -16,7 +16,7 @@ # * https://bugs.python.org/issue22240 # pip workaround: # * https://github.com/pypa/pip/blob/08c99b6e00135ca8df2e98db58aa0b701b971c64/src/pip/_internal/utils/misc.py#L124-L134 -def get_prog() -> str: +def get_prog(): """Determine the program name if invoked directly or as a module""" name = ( @@ -36,7 +36,7 @@ def get_prog() -> str: return name -def main() -> int: +def main(): """Main function with useful demo application""" parser = argparse.ArgumentParser(prog=get_prog()) From 8f94a80a43fe0f8724148bb5b11fe27e1008df19 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 11:22:52 -0700 Subject: [PATCH 5/8] devmem: Fix Python 2 stdout buffer handling * Clumsy but works. --- devmem/__main__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/devmem/__main__.py b/devmem/__main__.py index aab7784..3f0d1eb 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -144,7 +144,12 @@ def main(): elif args.binary: buf = mem.read(0x0, args.num) - sys.stdout.buffer.write(buf.data) + + # TODO Delete when dropping Python2 support + if sys.version_info.major == 2: + sys.stdout.write(buf.data) + else: + sys.stdout.buffer.write(buf.data) else: print(mem.read(0x0, args.num).hexdump(words_per_row)) From 583cbfa6f4cfba8f76ae346fc24038b0caf82506 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 21:10:33 -0700 Subject: [PATCH 6/8] github: workflows: Add simple package test * Here we go. --- .github/workflows/python-package.yml | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 .github/workflows/python-package.yml diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 0000000..59f9ae9 --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,39 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Python package + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [2.7, 3.5, 3.6, 3.7, 3.8] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + python -m unittest -v From b4acabb2c8687fcda649f7d00fdc7b0d763784f9 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 21:17:54 -0700 Subject: [PATCH 7/8] devmem: main: Fix __spec__ issue with Python 2 * Flake8 meltsdown. Work around it. --- devmem/__main__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/devmem/__main__.py b/devmem/__main__.py index 3f0d1eb..d236234 100644 --- a/devmem/__main__.py +++ b/devmem/__main__.py @@ -19,11 +19,9 @@ def get_prog(): """Determine the program name if invoked directly or as a module""" - name = ( - sys.argv[0] - if globals().get("__spec__") is None - else __spec__.name.partition(".")[0] - ) + spec = globals().get("__spec__") + name = sys.argv[0] if spec is None else spec.name.partition(".")[0] + try: prog = os.path.basename(sys.argv[0]) if prog in ("__main__.py", "-c"): From 6cbf9bc9082f81f05a52892195280e04ae730431 Mon Sep 17 00:00:00 2001 From: Kyle Manna Date: Mon, 26 Oct 2020 21:09:08 -0700 Subject: [PATCH 8/8] wip: test: Begin test framework --- test.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 test.py diff --git a/test.py b/test.py new file mode 100644 index 0000000..37c9cb4 --- /dev/null +++ b/test.py @@ -0,0 +1,22 @@ +import unittest +import devmem + +class TestDevMem(unittest.TestCase): + + def test_upper(self): + #devmem.main(...) + self.assertEqual('foo'.upper(), 'FOO') + + def test_isupper(self): + self.assertTrue('FOO'.isupper()) + self.assertFalse('Foo'.isupper()) + + def test_split(self): + s = 'hello world' + self.assertEqual(s.split(), ['hello', 'world']) + # check that s.split fails when the separator is not a string + with self.assertRaises(TypeError): + s.split(2) + +if __name__ == '__main__': + unittest.main()