diff --git a/examples/example_cdi_access.py b/examples/example_cdi_access.py
index d87130d..c27b75b 100644
--- a/examples/example_cdi_access.py
+++ b/examples/example_cdi_access.py
@@ -21,6 +21,7 @@
from examples_settings import Settings # do 1st to fix path if no pip install
from openlcb import precise_sleep
+from openlcb.convert import Convert
from openlcb.xmldataprocessor import attrs_to_dict
from openlcb.tcplink.tcpsocket import TcpSocket
settings = Settings()
@@ -137,7 +138,7 @@ def memoryReadSuccess(memo):
resultingCDI += memo.data
logger.debug(
f"[{memo.address}] successful read"
- f" {MemoryService.arrayToString(memo.data, len(memo.data))}"
+ f" {Convert.arrayToString(memo.data, len(memo.data))}"
"; next = address + 64")
# update the address
memo.address = memo.address+64
@@ -189,10 +190,8 @@ class MyHandler(xml.sax.handler.ContentHandler):
_chunks (list[str]): Collects chunks of data.
This is implementation-specific, and not
required if streaming (parser.feed).
- _tmp_address (int|None): Where we are in the memory space (starting
- at origin, and calculated using offset and/or size of start
- tags).
- _tmp_space (int|None): What space we are currently on.
+ _tmp_address (int|None): For sanity check, not actual address.
+ See replicatedTree docstring.
"""
def __init__(self):
diff --git a/examples/example_node_implementation.py b/examples/example_node_implementation.py
index 768f09f..b9e9416 100644
--- a/examples/example_node_implementation.py
+++ b/examples/example_node_implementation.py
@@ -79,7 +79,11 @@ def printMessage(message):
print("RM: {} from {}".format(message, message.source))
-canLink = CanLink(physicalLayer, NodeID(settings['localNodeID']))
+localNodeID = NodeID(settings['localNodeID'])
+print()
+print(f"[example_node_memory_implementation] localNodeID: {localNodeID}")
+
+canLink = CanLink(physicalLayer, localNodeID)
canLink.registerMessageReceivedListener(printMessage)
datagramService = DatagramService(canLink)
diff --git a/examples/example_node_memory_implementation.py b/examples/example_node_memory_implementation.py
new file mode 100644
index 0000000..d5011c1
--- /dev/null
+++ b/examples/example_node_memory_implementation.py
@@ -0,0 +1,248 @@
+'''
+Demo of creating a virtual node to represent the application
+(other local nodes are possible, but at least one is necessary
+for the application to announce itself and provide SNIP info),
+in this case with memory to allow another node to change settings
+(could also be used to for a second virtual node such as to
+represent/emulate a non-LCC train, but a separate
+virtual node from the Configuration Tool is recommended in
+that case).
+
+based on example_node_implementation from python-openlcb examples.
+
+Usage:
+python3 example_node_memory_implementation.py [host|host:port]
+
+Options:
+host|host:port (optional) Set the address (or using a colon,
+ the address and port). Defaults to a hard-coded test
+ address and port.
+'''
+import os
+import socket
+import struct
+
+# region same code as other examples
+from examples_settings import Settings
+from openlcb.localnode import LocalNode # do 1st to fix path if no pip install
+settings = Settings()
+
+if __name__ == "__main__":
+ settings.load_cli_args(docstring=__doc__)
+# endregion same code as other examples
+
+from openlcb import emit_cast, get_config_dir, precise_sleep # noqa: E402
+from openlcb.tcplink.tcpsocket import TcpSocket # noqa: E402
+
+from openlcb.canbus.canphysicallayergridconnect import ( # noqa: E402
+ CanPhysicalLayerGridConnect,
+)
+from openlcb.canbus.canlink import CanLink # noqa: E402
+from openlcb.nodeid import NodeID # noqa: E402
+from openlcb.datagramservice import DatagramService # noqa: E402
+from openlcb.memoryservice import MemoryService # noqa: E402
+from openlcb.message import Message # noqa: E402
+from openlcb.mti import MTI # noqa: E402
+
+from openlcb.localnodeprocessor import LocalNodeProcessor # noqa: E402
+from openlcb.pip import PIP # noqa: E402
+from openlcb.snip import SNIP # noqa: E402
+from openlcb.node import Node # noqa: E402
+
+# specify connection information
+# region moved to settings
+# host = "192.168.16.212"
+# port = 12021
+# localNodeID = "05.01.01.01.03.01"
+# farNodeID = "09.00.99.03.00.35"
+# endregion moved to settings
+
+sock = TcpSocket()
+# s.settimeout(30)
+try:
+ sock.connect(settings['host'], settings['port'])
+except socket.gaierror:
+ print("Failure accessing {}:{}"
+ .format(settings.get('host'), settings.get('port')))
+ raise
+
+print("RR, SR are raw socket interface receive and send;"
+ " RL, SL are link interface; RM, SM are message interface")
+
+
+# def sendToSocket(frame: CanFrame):
+# string = frame.encodeAsString()
+# print(" SR: {}".format(string.strip()))
+# sock.sendString(string)
+# physicalLayer.onFrameSent(frame)
+
+
+def printFrame(frame):
+ print(" RL: {}".format(frame))
+
+
+physicalLayer = CanPhysicalLayerGridConnect()
+physicalLayer.registerFrameReceivedListener(printFrame)
+
+
+def printMessage(message):
+ print("RM: {} from {}".format(message, message.source))
+
+
+localNodeID = NodeID(settings['localNodeID'])
+print()
+print(f"[example_node_memory_implementation] localNodeID: {localNodeID}")
+canLink = CanLink(physicalLayer, localNodeID)
+canLink.registerMessageReceivedListener(printMessage)
+
+datagramService = DatagramService(canLink)
+canLink.registerMessageReceivedListener(datagramService.process)
+
+spaces = { # big endian (most significant byte sent first) as per openlcb
+ # 0: bytearray([
+ # 0x01, 0x00, # 0x1000 = 4096 (unsigned int 16)
+ # ])
+ 0: bytearray(struct.pack(">H", 12021)),
+}
+# bytearray allows in-place append (from pack bytes does not)
+# H: short (capitalized means unsigned)
+# >: big endian (required for openlcb)
+# e: float16 (IEEE 754 binary16, 2-bytes)
+# For other symbols see Python documentation or SUBTYPE_FORMATS in cdivar.py.
+
+spaces[0] += struct.pack(">e", 0.5) # save at address 3 (size 2)
+# NOTE: 0.5 can be stored precisely, as b'\x008'
+# but not all numbers can be represented by IEEE float.
+# For example, 2.4 is stored as b'\xcd@' which is ~2.400390625
+
+# Additional pack examples:
+neg2_float_ba = bytearray(b'\xc0\x00')
+neg2_float_b = struct.pack(">e", -2)
+assert bytes(neg2_float_ba) == bytes(neg2_float_b), \
+ f"expected b'\xc0\x00', b'\xc0\x00', got {neg2_float_ba}, {neg2_float_b}"
+
+cdi = """
+
+
+ python-openlcb example authors
+ example_node_memory_implementation
+ 1.0
+ 1.0
+
+
+
+
+ Port
+ Network port of remote hub (2-byte unsigned short)
+ 12021
+
+
+ Timeout
+ Network timeout (2-byte binary16 value).
+ 0.5
+
+
+
+""" # noqa: E501
+
+
+def handleDatagram(memo):
+ """create a call-back to print datagram contents when received
+
+ Args:
+ memo (DatagramReadMemo): The datagram received
+
+ Returns:
+ bool: Always False (True would mean we sent a reply to the datagram,
+ but let the MemoryService do that).
+ """
+ print(f"Datagram receive call back: {emit_cast(memo)}")
+ return False
+
+
+datagramService.registerDatagramReceivedListener(handleDatagram)
+
+memoryService = MemoryService(datagramService)
+
+
+# callbacks to get results of memory read
+
+def memoryReadSuccess(memo):
+ print("successful memory read: {}".format(memo.data))
+
+
+def memoryReadFail(memo):
+ print("memory read failed: {}".format(memo.data))
+
+
+# create a node and connect it update
+# This is a very minimal node, which just takes part in the low-level common
+# protocols
+localNode = LocalNode(
+ NodeID(settings['localNodeID']),
+ SNIP("python-openlcb example authors",
+ "example_node_memory_implementation",
+ "1.0", "1.0", "Custom Name Here", "Custom Description Here"),
+ set([
+ PIP.SIMPLE_NODE_IDENTIFICATION_PROTOCOL,
+ PIP.DATAGRAM_PROTOCOL,
+ PIP.CONFIGURATION_DESCRIPTION_INFORMATION,
+ PIP.ADCDI_PROTOCOL,
+ PIP.MEMORY_CONFIGURATION_PROTOCOL,
+ ]),
+ canLink
+)
+my_conf_dir = os.path.join(get_config_dir("python-openlcb"))
+backup_name = "example_node_memory_implementation.cdi.xml"
+backup_path = os.path.join(my_conf_dir, backup_name)
+
+localNode.loadCDIString(cdi, backup_path)
+
+# localNodeProcessor = LocalNodeProcessor(canLink, localNode)
+# canLink.registerMessageReceivedListener(localNodeProcessor.process)
+localNodeProcessor = localNode.localNodeProcessor
+
+
+def displayOtherNodeIds(message) :
+ """Listener to identify connected nodes
+
+ Args:
+ message (Message): A response from the network
+ """
+ print("[displayOtherNodeIds] type(message): {}"
+ "".format(type(message).__name__))
+ if message.mti == MTI.Verified_NodeID :
+ print("Detected farNodeID is {}".format(message.source))
+
+
+canLink.registerMessageReceivedListener(displayOtherNodeIds)
+
+
+#######################
+
+# have the socket layer report up to bring the link layer up and get an alias
+
+print(" SL : link up...")
+physicalLayer.physicalLayerUp()
+print(" SL : link up...waiting...")
+while canLink.pollState() != CanLink.State.Permitted:
+ physicalLayer.receiveAll(sock, verbose=settings['trace'])
+ physicalLayer.sendAll(sock, verbose=True)
+ precise_sleep(.02)
+print(" SL : link up")
+# request that nodes identify themselves so that we can print their node IDs
+message = Message(MTI.Verify_NodeID_Number_Global,
+ NodeID(settings['localNodeID']), None)
+canLink.sendMessage(message)
+
+# process resulting activity
+while True:
+ count = 0
+ count += physicalLayer.sendAll(sock, verbose=True)
+ count += physicalLayer.receiveAll(sock, verbose=settings['trace'])
+ if count < 1:
+ precise_sleep(.01)
+ # else skip sleep to avoid latency (port already delayed)
+
+physicalLayer.physicalLayerDown()
diff --git a/examples/examples_gui.py b/examples/examples_gui.py
index fab718e..a38338e 100644
--- a/examples/examples_gui.py
+++ b/examples/examples_gui.py
@@ -632,7 +632,6 @@ def downloadCDI(self, farNodeID: str):
self.setStatus("Downloading CDI...")
assert self.cdi_form is not None
assert self.network is not None
- self.cdi_form.onStartDownload()
try:
self.network.download(farNodeID, MemorySpace.CDI,
self.cdi_form)
diff --git a/examples/tkexamples/cdiform.py b/examples/tkexamples/cdiform.py
index b6702b5..09fce4f 100644
--- a/examples/tkexamples/cdiform.py
+++ b/examples/tkexamples/cdiform.py
@@ -221,13 +221,14 @@ def getStatus(self):
def onStartDownload(self):
"""Initialize variables used by element handler(s)."""
- self.onStart()
- self._resetTree()
+ XMLDataProcessor.onStartDownload(self)
+ # TODO: clear tree?
def onStatusMemo(self, cm: CDIMemo) -> bool:
"""Handler for incoming CDI tag
- Use this for callback in downloadCDI, which sets parser
- (_dataProcessor)'s _onElement.
+ Use this for callback in downloadCDI
+ (onStatusMemo replaces _dataProcessor's _onElement
+ formerly set by downloadCDI).
Args:
cm (CDIMemo): Document parsing state info
diff --git a/openlcb/__init__.py b/openlcb/__init__.py
index 0eebe1b..3e7a8ac 100644
--- a/openlcb/__init__.py
+++ b/openlcb/__init__.py
@@ -1,4 +1,6 @@
from enum import Enum
+import os
+import platform
import re
import time
@@ -23,11 +25,26 @@
ORD_z = 0x7A
-def only_hex_pairs(value: str) -> Union[re.Match[bytes], re.Match[str], None]:
+def get_config_dir(unique_software_name: str):
+ """Get a configuration directory for any program
+ (In the parent directory recommended by the specific platform).
+ """
+ CONFIGS = os.path.expanduser("~/.config")
+ if platform.system() == "Darwin":
+ CONFIGS = os.path.expanduser("~/Library/Application Support")
+ elif platform.system() == "Windows":
+ CONFIGS = os.environ['APPDATA']
+ return os.path.join(CONFIGS, unique_software_name)
+
+
+def only_hex_pairs(value: str):
+ # type: (str) -> Union[re.Match[bytes], re.Match[str], None]
"""Check if string contains only machine-readable hex pairs.
See openlcb.conventions submodule for LCC ID dot notation
functions (less restrictive).
"""
+ # ^ PEP8 (instead of Python) type hint is used to avoid
+ # "TypeError: 'type' object is not subscriptable"
if isinstance(value, (bytearray, bytes)):
return hex_pairs_brc.fullmatch(value)
assert isinstance(value, str)
@@ -36,6 +53,8 @@ def only_hex_pairs(value: str) -> Union[re.Match[bytes], re.Match[str], None]:
def emit_cast(value) -> str:
"""Get type and value, such as for debug output."""
+ if value is None:
+ return "None"
repr_str = repr(value)
if isinstance(value, Enum):
repr_str = "{}".format(value.value)
@@ -131,3 +150,26 @@ def from_hex_bytes(b: bytearray, start: int, stop: int,
def from_all_hex_bytes(b: bytearray) -> bytearray:
return from_hex_bytes(b, 0, len(b))
+
+
+def hr_repr(value, always_quote: bool = False) -> str:
+ """Represent value with double quotes
+ if str, otherwise as an unquoted string.
+ (Human-readable repr).
+ """
+ # repr_value = repr(value)
+ # repr_value = repr_value.replace("\\\\", "\\")
+ # if repr_value.startswith("'") and repr_value.endswith("'"):
+ # return '"' + repr_value[1:-1].replace('"', '\\"') + '"'
+ repr_value = str(value)
+ if always_quote or isinstance(value, str):
+ return '"' + repr_value.replace('"', '\\"') + '"'
+ return repr(value)
+
+
+def d_quote(value) -> str:
+ """Represent any type of value in double-quotes
+ (with any already present escaped) such as for emitting XML
+ attribute debug messages or any other technical/literary use.
+ """
+ return hr_repr(value, always_quote=True)
diff --git a/openlcb/cdimemo.py b/openlcb/cdimemo.py
index 78ee35a..84a37c4 100644
--- a/openlcb/cdimemo.py
+++ b/openlcb/cdimemo.py
@@ -1,13 +1,18 @@
from collections import OrderedDict
+import copy
import json
import math
import xml.etree.ElementTree
# import xml.etree.ElementTree as ET
-from typing import List, Optional, Union
+from typing import Dict, List, Optional, Union
+from logging import getLogger
-from openlcb.cdivar import FLOAT_MAXIMUMS, NUM_TYPES, CDIVar
+from openlcb.cdivar import CLASSNAME_TYPES, FLOAT_MAXIMUMS, NUM_TYPES, CDIVar
from openlcb.message import Message
+from openlcb.dataprocessormemo import DataProcessorMemo
+
+logger = getLogger(__name__)
def element_ordered(el: xml.etree.ElementTree.Element):
@@ -17,7 +22,7 @@ def element_ordered(el: xml.etree.ElementTree.Element):
return od
-class CDIMemo:
+class CDIMemo(DataProcessorMemo):
"""Store parsing state info as a tree (This is a tree node)
Attributes:
@@ -40,28 +45,34 @@ class CDIMemo:
error (str): Message of failure (requires 'done' if stopped).
iid (str): Treeview branch id (no parent when top of Treeview)
name (str): Name (determined by `name` child element content).
+ space (int|None): The memory space address (May be one in
+ MemorySpace values, or not if vendor-specific such as
+ defined in CDI etc. See replicatedTree in XMLDataProcessor).
stray (bool): The end tag is misplaced (doesn't match a start
tag) due to bad xml or incorrect parsing.
+ tail (str|None): Content following the end tag (not used in
+ OpenLCB CDI/FDI standards).
"""
def __init__(self, tag: Union[str, None] = None,
element: Union[xml.etree.ElementTree.Element, None] = None,
status: Union[str, None] = None,
- parent: Optional['CDIMemo'] = None):
+ parent: Optional['CDIMemo'] = None,
+ document: Optional['XMLDocumentProcessor'] = None):
+ DataProcessorMemo.__init__(self)
self.tag = tag # type: str|None
# self.name = None # type: str|None
self.element = element # type: xml.etree.ElementTree.Element|None
- self.status = status # type: str|None
- self.error = None # type: str|None
- self.done = False # type: bool
- self.end = False # type: bool
self.parent = parent # type: CDIMemo|None
self.stray = False # type: bool
self.content = None # type: str|None
- self.message: Union[Message, None] = None # type: Message|None
+ self.tail = None # type: str|None
+ # TODO: Set tail (unused in OpenLCB CDI/FDI standards, but allowed in XML)
self.iid = None # type: str|None
self.address = None # type: int|None
+ self.space = None # type: int|None
self.cdivar = None # type: CDIVar|None
self.children = [] # type: List[CDIMemo]
+ self.document: Union[Optional['XMLDocumentProcessor'], None] = document
def getTag(self):
if self.element is None:
@@ -96,11 +107,32 @@ def getChildContent(self, tag) -> Union[str, None]:
return None
def copy(self):
+ return self.__copy__()
+
+ def __copy__(self):
cm = CDIMemo()
for k, v in self.__dict__.items():
setattr(cm, k, v)
return cm
+ def __deepcopy__(self, memo: dict):
+ """Allow deepcopy on this class.
+ Place id of new object in memo dict
+ (prevents infinite recursion).
+ See .
+ """
+ cm = type(self)()
+ memo[id(self)] = cm
+ for k, v in self.__dict__.items():
+ if k == 'parent':
+ # prevent invalid container
+ continue
+ if k == "document":
+ # prevent un-pickle-able object (& invalid container)
+ continue
+ setattr(cm, k, copy.deepcopy(v, memo))
+ return cm
+
def getBranch(self, default=None) -> Union[str, None]:
"""Get tree branch widget iid if any."""
if self.iid is None:
@@ -139,6 +171,8 @@ def to_dict(cm):
# continue
if k == 'parent':
continue
+ if k == 'document':
+ continue
if isinstance(v, xml.etree.ElementTree.Element):
d[k] = element_ordered(v)
continue
@@ -149,20 +183,48 @@ def __str__(self):
return json.dumps(CDIMemo.to_dict(self), default=CDIMemo.to_dict)
def toCDIVar(self):
+ # type: () -> CDIVar
"""Create a CDIVar from descriptors (child elements of self).
See LCC "Configuration Description Information" Standard.
+
+ NOTE: The `address` is only correct if this CDIMemo has been
+ replicated (such as in replicatedTree or self.replicated_root).
"""
- result = CDIVar(self.tag)
+ # result = CDIVar(self.tag)
assert (self.tag is not None) and (self.tag.strip())
- result.className = self.tag.lower()
+ className = self.tag.lower()
+ result_floatFormat = None
if self.element:
- result.floatFormat = self.element.attrib.get('floatFormat')
+ result_floatFormat = self.element.attrib.get('floatFormat')
this_t = NUM_TYPES.get(self.tag) if self.tag else None
+ result_min = None
+ result_max = None
+ result_default = None
+ result_size = self.getSize()
if this_t is not None:
- result.min = self.getChildContentN("min", result.className)
- result.max = self.getChildContentN("max", result.className)
- result.default = self.getChildContentN("default", result.className)
- result.size = self.getSize()
+ result_min = self.getChildContentN("min", className)
+ result_max = self.getChildContentN("max", className)
+ default_n = self.getChildContentN("default", className)
+ if default_n is not None:
+ default_var = CDIVar(className, _size=result_size)
+ if isinstance(default_n, int):
+ assert self.tag == "int"
+ default_var.setInt(default_n)
+ else:
+ assert self.tag == "float"
+ default_var.setFloat(default_n)
+ assert default_var.data is not None
+ result_default = bytearray(default_var.data)
+ # Size must be gotten ahead of time since CDIVar constructor
+ # enforces size:
+ result = CDIVar(self.tag, _min=result_min, _max=result_max,
+ _size=result_size, _default=result_default)
+ result.address = self.address # only set in replicatedTree()
+ result.space = self.space
+ result.floatFormat = result_floatFormat
+ result.name = self.getChildContent("name")
+ if not result.name and (self.tag in CLASSNAME_TYPES):
+ raise NotImplementedError(f"Can't get name for {self}")
if result.className == "int":
if result.min is None:
@@ -171,10 +233,12 @@ def toCDIVar(self):
result.signed = True
# if self.size is not None:
if result.size not in [1, 2, 4, 8]:
+ children_msg = json.dumps(self.children, sort_keys=True,
+ indent=2,
+ default=CDIMemo.to_dict)
raise AttributeError(
f"expected 1,2,4,8 for int size, got {result.size}"
- f" in children={json.dumps(self.children, sort_keys=True, indent=2,
- default=CDIMemo.to_dict)}")
+ f" in children={children_msg}")
if result.max is None:
if result.signed:
result.max = math.pow(2, result.size * 8 - 1) - 1
@@ -194,9 +258,44 @@ def toCDIVar(self):
return result
def getSize(self):
+ if self.tag == "eventid":
+ return 8
if self.element is None:
return None
size = self.element.attrib.get('size')
if size is None:
return None
return int(size)
+
+ def addChildren(self) -> None:
+ """Recursively build the full CDIMemo tree from self.element.
+
+ Populates ``self.children`` with proper CDIMemo instances
+ (one per direct child element). Each child memo also gets
+ its own children built recursively.
+
+ Preserves original ``.content`` from the parsed tree.
+ """
+ if self.element is None:
+ self.children = []
+ return
+
+ self.children = []
+ if self.element.text:
+ self.content = self.element.text
+ elif self.element.tag.lower() in ("name", "description"):
+ logger.warning(
+ f"{self.element.tag} has no content.")
+
+ if self.element.tail:
+ self.tail = self.element.tail
+
+ for child_elem in list(self.element): # list() fixes concurrency issue
+ child_memo = CDIMemo(
+ tag=child_elem.tag,
+ element=child_elem,
+ parent=self,
+ document=self.document
+ )
+ child_memo.addChildren() # recursive
+ self.children.append(child_memo)
diff --git a/openlcb/cdivar.py b/openlcb/cdivar.py
index 8025905..38dd44e 100644
--- a/openlcb/cdivar.py
+++ b/openlcb/cdivar.py
@@ -1,19 +1,26 @@
+import base64
+from collections import OrderedDict
+import copy
import struct
-from openlcb import emit_cast
-from typing import List, Type, Union
+from logging import getLogger
+from typing import Any, List, Type, Union
+from openlcb import emit_cast
from openlcb.eventid import EventID
from openlcb.openlcbaction import OpenLCBAction
+logger = getLogger(__name__)
NUM_TYPES = {'int': int, 'float': float} # type: dict[str, Type]
-# Assumes "IEEE" in LCC CDI Standard means IEEE 754-2008:
+# Assumes "IEEE" in OpenLCB CDI Standard means IEEE 754-2008:
FLOAT_MAXIMUMS = {16: 65504.0, 32: 3.40e38, 64: 1.80e308} # type: dict[int, float] # noqa: E501
CLASSNAME_TYPES = {'int': int, 'float': float, 'string': str,
'blob': bytearray, 'eventid': EventID,
'action': OpenLCBAction}
+SIZED_CONSTRUCTION_TYPES = copy.deepcopy(CLASSNAME_TYPES)
+SIZED_CONSTRUCTION_TYPES['eventid'] = bytearray
SUBTYPE_FORMATS = {
'int8': "b", 'uint8': "B",
'int16': ">h", 'uint16': ">H",
@@ -26,6 +33,8 @@
STANDARD_SIZES = {
'int': (1, 2, 4, 8),
'float': (2, 4, 8),
+ 'eventid': (8,),
+ 'action': (1, 2, 4, 8),
}
@@ -45,10 +54,12 @@ class CDIVar:
(for className == "float").
signed (bool): Whether the value is signed (False unless min is
negative). Defaults to True.
- See LCC "Configuration Description Information" Standard.
+ See OpenLCB "Configuration Description Information" Standard.
_data (bytes): The value read from the device or ready to
write. Only None if not read yet, otherwise length
must be .size.
+ element (xml.etree.Element): An associated element in an XML
+ tree.
"""
TYPED_KEYS = ['min', 'max', 'default']
@@ -59,6 +70,7 @@ def __init__(self, className, _min=None, _max=None,
assert className, f"Expected {CLASSNAME_TYPES.keys()} got {className}"
assert className in CLASSNAME_TYPES, \
f"Expected {list(CLASSNAME_TYPES.keys())} got {className}"
+ self.name = None # type: str|None
self.className = className # type: str
self.data = None # type: bytes|None
self.min = _min # type: int|float|None
@@ -68,12 +80,48 @@ def __init__(self, className, _min=None, _max=None,
self.max = _max # type: int|float|None
self.default = _default # type: bytearray|None
self.size = _size # type: int|None
+ self.branch_size = None # type: int|None # size including children
if self.size is None:
if self.default is not None:
self.size = len(self.default)
if self.className in ("int", "float"):
self.assertNumberFormat()
+ elif self.className == "eventid":
+ if (_size is not None) and (_size != 8):
+ logger.error(
+ f'Specified eventid size="{_size}" but 8 is required.')
+ self.size = 8
+ sizes = STANDARD_SIZES.get(self.className)
+ if sizes is not None:
+ assert self.size in sizes, \
+ (f"Expected size in {sizes}"
+ f" for {self.className} but got {self.size}")
self.floatFormat = None # type: str|None
+ self.address = None # type: int|None
+ self.element = None # type: Any|None
+ self.space = None # type: int|None
+
+ def setData(self, data: Union[bytes, bytearray]):
+ assert isinstance(data, (bytes, bytearray))
+ if isinstance(data, bytes):
+ data = bytearray(data)
+ if self.className == "eventid":
+ assert len(data) == 8
+ elif self.className == "blob":
+ # FIXME: enforce blob
+ pass
+ elif self.className == "string":
+ assert self.size
+ assert len(data) <= self.size
+ elif self.className in SIZED_CONSTRUCTION_TYPES:
+ assert self.size
+ assert len(data) == self.size
+ else:
+ raise NotImplementedError(f"Type {self.className} not implemented")
+ self.data = data
+
+ def getData(self):
+ return self.data
def isNumber(self):
return self.className in ("int", "float")
@@ -83,9 +131,11 @@ def standardSizes(self) -> Union[List[int], None]:
def assertNumberFormat(self, assertWhat=""):
if self.className == "int":
- assert self.size in (1, 2, 4, 8)
+ assert self.size in (1, 2, 4, 8), \
+ f"Expected size (1, 2, 4, 8) for int, got {self.size}"
elif self.className == "float":
- assert self.size in (2, 4, 8)
+ assert self.size in (2, 4, 8), \
+ f"Expected size (2, 4, 8) for float, got {self.size}"
else:
if not assertWhat:
assertWhat = f"Expected float/int size {STANDARD_SIZES}"
@@ -122,11 +172,32 @@ def intToData(self, value: int) -> bytes:
assert isinstance(value, int)
return struct.pack(self.packFormat(), value)
+ def getSerializable(self):
+ """Get a value in the corresponding Python type"""
+ if self.className == "int":
+ return self.getInt()
+ elif self.className == "float":
+ return self.getFloat()
+ elif self.className == "string":
+ return self.getString()
+ assert self.className in ("blob", "eventid", "action")
+ assert self.data is not None, "CDIVar data not initialized"
+ return base64.b64encode(self.data)
+
+ def getDict(self, add_name=True):
+ result = OrderedDict()
+ if add_name and self.name:
+ result['name'] = self.name
+ result['className'] = self.className
+ result['value'] = self.getSerializable()
+ return result
+
def setInt(self, value: int):
self.data = self.intToData(value)
def floatToData(self, value: float) -> bytes:
- assert self.className == "float"
+ assert self.className == "float", \
+ f"floatToData attempted on non-float: {self.className}"
assert isinstance(value, float)
return struct.pack(self.packFormat(), value)
@@ -139,8 +210,13 @@ def stringToData(self, value: str) -> bytes:
return value.encode("utf-8")
def setString(self, value: str):
- self.data = self.stringToData(value)
- self.size = len(self.data)
+ # self.data = self.stringToData(value)
+ # self.size = len(self.data)
+ # assert self.className == "string"
+ encoded = value.encode("utf-8")
+ assert self.size is not None
+ assert len(encoded) + 1 <= self.size # size is max *only* if "string"
+ self.data = encoded + b"\x00" # null-terminated for OpenLCB network
def dataToInt(self, data) -> Union[int, None]:
assert self.className == "int"
@@ -173,4 +249,17 @@ def dataToString(self, data) -> Union[str, None]:
return data.decode("utf-8")
def getString(self) -> Union[str, None]:
- return self.dataToString(self.data)
+ # return self.dataToString(self.data)
+ if self.data is None or len(self.data) == 0:
+ return None
+ # Return content up to (but not including) first null
+ null_pos = self.data.find(b"\x00")
+ if null_pos == -1:
+ logger.error(f"No null terminator in {repr(self.data)}")
+ content = self.data
+ else:
+ content = self.data[:null_pos]
+ # try:
+ return content.decode("utf-8")
+ # except UnicodeDecodeError:
+ # return None # or raise
diff --git a/openlcb/conventions.py b/openlcb/conventions.py
index bd2af5c..842cf97 100644
--- a/openlcb/conventions.py
+++ b/openlcb/conventions.py
@@ -248,6 +248,7 @@ def generate_node_id_str(id_range_prefix: str, increment: bool = False) -> str:
lastParts = [f"{p:02X}" for p in generate_last_three_octets(increment=increment)] # noqa: E501
assert len(lastParts) == 3
+ id_range_prefix = id_range_prefix.rstrip(".")
prefixParts = id_range_prefix.split(".")
if len(prefixParts) < 3:
raise ValueError(
diff --git a/openlcb/convert.py b/openlcb/convert.py
new file mode 100644
index 0000000..5875477
--- /dev/null
+++ b/openlcb/convert.py
@@ -0,0 +1,193 @@
+'''
+based on part of MemoryService.swift
+
+Created by Bob Jacobsen on 6/1/22.
+
+These parts moved to a separate class so callers of static methods don't
+depend on MemoryService(DatagramService).
+
+'''
+
+from logging import getLogger
+from typing import (
+ List, # in case list doesn't support `[` in this Python version
+ Union, # in case `|` doesn't support 'type' in this Python version
+)
+
+logger = getLogger(__name__)
+
+
+class Convert:
+
+ @staticmethod
+ def deserializeMC2ndByte(datagramByte1):
+ """Decode byte[1] (2nd) of Memory Configuration Datagram"""
+ has_byte6 = False
+ if datagramByte1 & 0x03 == 0:
+ has_byte6 = True
+ return has_byte6, datagramByte1 & 0xFC
+ # ^ 0xFC = 11111100
+
+ # formerly spaceDecode, but it serializes a space for datagram byte2
+ @staticmethod
+ def serializeSpace(space):
+ """Convert from a space number to either
+ False and control number or True and standard memory space
+ for use in a Datagram.
+
+ Args:
+ space (int): Sequential memory space identifier, where values:
+ - 0xFF to 0xFD are special spaces, and only the least significant
+ 2 bits will be used in a datagram.
+ - 0x00 to 0xFC represent standard memory spaces directly.
+
+ Returns:
+ tuple(bool, byte): (is custom space, control | space)
+ - (False, control number 1 to 3 inclusive) :
+ spaces 0xFF - 0xFD (Except bits beyond 0x00000011
+ differ for each datagram type. See 4.2 Address
+ Space Selection in OpenLCB Memory Configuration
+ Standard)
+ - or (True, space number) : spaces 0 - 0xFC
+ (NOTE: type of space may affect type of output)
+ """
+ # TODO: Maybe check type of space & raise TypeError if not
+ # something valid, whether byte, int, or what is ok [add
+ # more _description_ to space in docstring].
+ if space >= 0xFD:
+ return (False, space & 0x03)
+ return (True, space)
+
+ @staticmethod
+ def arrayToInt(data: Union[bytes, bytearray, List[int]]) -> int:
+ """Convert an array in MSB-first order to an integer
+
+ Args:
+ data (Union[bytes,bytearray,list[int]]): MSB-first order
+ encoded 32-bit int
+
+ Returns:
+ int: The converted data as a number.
+ """
+ result = 0
+ for index in range(0, len(data)):
+ result = result << 8
+ result = result | data[index]
+ return result
+
+ @staticmethod
+ def arrayToUInt64(data):
+ """Parse a MSB-first order 64-bit integer
+ (Python auto-sizes int, so this is same as arrayToInt).
+ """
+ return Convert.arrayToInt(data)
+
+ @staticmethod
+ def arrayToString(data, length):
+ """Decode utf-8 bytes to string
+ up to the 1st zero byte or given length,
+ whichever is fewer characters.
+
+ Args:
+ data (Union[bytearray, bytes]): A string encoded as bytes.
+ length (int): The used length the data.
+
+ Returns:
+ str: Data decoded as text.
+ """
+ if not isinstance(data, bytearray):
+ raise TypeError("Expected bytearray (formerly list[int]), got {}"
+ .format(type(data).__name__))
+ zeroIndex = len(data)
+ try:
+ temp = data.index(0)
+ zeroIndex = temp
+ except KeyboardInterrupt:
+ raise
+ except:
+ pass
+
+ byteCount = min(zeroIndex, length)
+
+ if byteCount == 0:
+ return ""
+
+ result = data[:byteCount].decode('utf-8')
+ return result
+
+ @staticmethod
+ def intToArray(value, length):
+ """Convert an integer into an array of given length
+
+ Args:
+ value (int): any value
+ length (int): Byte count (1, 2, 4, or 8).
+
+ Returns:
+ bytearray: The value encoded in big-endian format.
+ """
+ if value >= (1 << (length * 8)): # TODO: ? also exclude value < 0 ?
+ raise ValueError("Value {} cannot fit in {} bytes."
+ .format(value, length))
+ if length == 1:
+ return bytearray([
+ (value & 0xff)
+ ])
+ if length == 2:
+ return bytearray([
+ ((value >> 8) & 0xff), (value & 0xff)
+ ])
+ if length == 4:
+ return bytearray([
+ ((value >> 24) & 0xff), ((value >> 16) & 0xff),
+ ((value >> 8) & 0xff), (value & 0xff)
+ ])
+ if length == 8:
+ return bytearray([
+ ((value >> 56) & 0xff), ((value >> 48) & 0xff),
+ ((value >> 40) & 0xff), ((value >> 32) & 0xff),
+ ((value >> 24) & 0xff), ((value >> 16) & 0xff),
+ ((value >> 8) & 0xff), (value & 0xff)
+ ])
+ logger.error("integer length {} is not implemented.".format(length))
+ return bytearray()
+
+ @staticmethod
+ def uInt64ToArray(value, length):
+ '''Convert a 64-bit integer into an array of given length
+ (Python auto-sizes int, so this is same as intToArray)
+ '''
+ return Convert.intToArray(value, length)
+
+ @staticmethod
+ def stringToArray(value, length):
+ '''Converts a string to an array of given length
+ padding with 0 bytes as needed
+ '''
+ strToUInt8 = value.encode('utf-8')
+ byteCount = min(length, len(strToUInt8))
+ # convert to bytearray since bytes is immutable:
+ contentPart = bytearray(strToUInt8[:byteCount])
+ if len(contentPart) >= length:
+ if len(contentPart) > length:
+ logger.warning(
+ "MemoryService stringToArray: len(value)=={}"
+ " exceeds length {}".format(len(value), length))
+ # TODO: Truncate (or is any length ok for the caller)?
+ return contentPart
+ # list[int] is compatible bytearray extend but not `+` so cast
+ # to bytearray after getting list[int] of remaining length:
+ padding = bytearray([0] * (length-len(contentPart)))
+ return contentPart + padding
+
+ @staticmethod
+ def getBeforeNull(data: Union[bytes, bytearray], start):
+ null_idx = -1
+ for i in range(start, len(data)):
+ assert isinstance(data[i], int)
+ if data[i] == 0:
+ null_idx = i
+ break
+ if null_idx > -1:
+ return data[start:null_idx]
+ return data[start:]
diff --git a/openlcb/datagramservice.py b/openlcb/datagramservice.py
index dd70f4f..ff412f4 100644
--- a/openlcb/datagramservice.py
+++ b/openlcb/datagramservice.py
@@ -246,6 +246,13 @@ def handleDatagramReceivedOK(self, message: Message):
# match to the memo and remove from queue
memo = self.matchToWriteMemo(message) # type: DatagramWriteMemo|None
+ # check for whether a match was found, indicating this was for us
+ if memo is None:
+ logger.debug(
+ f"Unrelated OK reply discarded: from"
+ f" {message.source} to {message.destination}")
+ return
+
# check of tracking logic
if self.currentOutstandingMemo != memo:
logger.error(
@@ -264,6 +271,13 @@ def handleDatagramRejected(self, message: Message):
# match to the memo and remove from queue
memo = self.matchToWriteMemo(message)
+ # check for whether a match was found, indicating this was for us
+ if memo is None:
+ logger.debug(
+ f"Unrelated Rejected reply discarded: from"
+ f" {message.source} to {message.destination}")
+ return
+
# check of tracking logic
if self.currentOutstandingMemo != memo:
logger.error(
diff --git a/openlcb/dataprocessor.py b/openlcb/dataprocessor.py
index 75a8f96..f3f7109 100644
--- a/openlcb/dataprocessor.py
+++ b/openlcb/dataprocessor.py
@@ -10,6 +10,23 @@ class DataFormat(Enum):
class DataProcessor:
"""Collect & process consecutive data from each incoming MemoryReadMemo.
Superclass for data listeners.
+
+ Attributes:
+ enable_cache (bool): Defaults to False (May differ in subclass).
"""
+ DEFAULT_EXT = ".bin" # override in subclass
+
def __init__(self):
- pass
+ self._is_from_cache = False # type: bool
+ self._path = None # type: str|None
+ self.enable_cache = False # type: bool
+ # Members used to construct space memo such as CDIMemo:
+ self.progress_ratio = None # type: float|None
+ self.progress_count = None # type: int|None
+ self.expected_size = None # type: int|None
+
+ def getPath(self):
+ return self._path
+
+ def isFromCache(self):
+ return self._is_from_cache
diff --git a/openlcb/dataprocessormemo.py b/openlcb/dataprocessormemo.py
new file mode 100644
index 0000000..32b2869
--- /dev/null
+++ b/openlcb/dataprocessormemo.py
@@ -0,0 +1,32 @@
+from typing import Union
+
+from openlcb.message import Message
+
+
+class DataProcessorMemo:
+ """Store parsing state info.
+ This superclass can be used for progress notification.
+
+ Attributes:
+ done (bool): If True, download such as downloadCDI is finished.
+ Though document itself may be incomplete if 'error' is also
+ set, stop tracking status of download regardless.
+ end (bool): False to start a deeper scope, or True for end tag,
+ which exits current scope (last created Treeview branch in
+ this case, or top if getBranch() would be None).
+ error (str): Message of failure (requires 'done' if stopped).
+ message (Message): Associated network/internal message.
+ name (str): Name (determined by `name` child element content).
+ status (str): Status message.
+ """
+ def __init__(self, status: Union[str, None] = None):
+ self.done = False # type: bool
+ self.end = False # type: bool
+ self.error = None # type: str|None
+ self.message: Union[Message, None] = None # type: Message|None
+ self.status = status # type: str|None
+ # region set by DataProcessor such as XMLDataProcessor
+ self.progress_ratio = None # type: float|None
+ self.progress_count = None # type: int|None
+ self.expected_size = None # type: int|None
+ # end region set by DataProcessor such as XMLDataProcessor
diff --git a/openlcb/localnode.py b/openlcb/localnode.py
new file mode 100644
index 0000000..8053085
--- /dev/null
+++ b/openlcb/localnode.py
@@ -0,0 +1,227 @@
+import os
+
+from logging import getLogger
+from typing import Union
+from openlcb import emit_cast
+from openlcb.node import PIP, SNIP, Node
+
+from openlcb.localnodeprocessor import LocalNodeProcessor
+from openlcb.nodeid import (
+ NodeID,
+)
+from openlcb.xmldataprocessor import (
+ CanLink,
+ CDIMemo,
+ CDIVar,
+ # CLASSNAME_TYPES,
+ d_quote,
+ MemoryReadMemo,
+ MemorySpace,
+ XMLDataProcessor,
+)
+
+logger = getLogger(__name__)
+
+
+class LocalNode(Node):
+ """A Node with its own virtual memory
+ (emulate memory spaces such as for creating a virtual
+ signal node with settings)"""
+ def __init__(self, id: NodeID, snip: SNIP, pipSet: set,
+ linkLayer: CanLink):
+ Node.__init__(self, id, snip, pipSet)
+ self.cdi = None # type: XMLDataProcessor|None
+ self._replicated_cdi_tree = None # type: CDIMemo|None
+ if PIP.CONFIGURATION_DESCRIPTION_INFORMATION in pipSet:
+ self.cdi = XMLDataProcessor(linkLayer, MemorySpace.CDI)
+ else:
+ logger.warning(
+ "PIP.CONFIGURATION_DESCRIPTION_INFORMATION is not in pipSet"
+ f" for new LocalNode {self.cdi}, so XMLDataProcessor"
+ " will not be initialized (functioning as Node, unless"
+ " remote user knows addresses apart from CDI)")
+ self.spaces = {} # type: dict[int, bytearray]
+ self.localNodeProcessor = LocalNodeProcessor(linkLayer, self)
+ linkLayer.registerMessageReceivedListener(
+ self.localNodeProcessor.process)
+
+ def loadCDIFile(self, path, memo=None):
+ """Load a CDI file to generate virtual memory spaces
+ (to create a virtual node, not representing a remote one)
+
+ Args:
+ path (str): Location of original file, also used
+ to generate cache dir (parent of path).
+ memo (MemoryReadMemo): Typically left blank,
+ This would provide a success or fail message,
+ but this method can be called asynchronously
+ since LocalNode assumes local data is loaded,
+ not network data.
+ """
+ assert isinstance(path, str)
+ if not os.path.isfile(path):
+ raise FileNotFoundError(path)
+
+ # self.cdi.load(self.id, path, MemorySpace.CDI, memo)
+ xml_data = None
+ with open(path, "wb") as stream:
+ xml_data = stream.read()
+ return self.loadCDIString(xml_data, path, memo=memo)
+
+ def loadCDIString(self, xml_data, path, memo=None):
+ """Load raw XML data from a string.
+ Args:
+ xml_data (Union[bytes, bytearray, str]): Raw XML
+ path (str): Location of original file, for
+ reference and use as cache dir (parent of path).
+ memo (Optional[MemoryReadMemo]): Typically left blank,
+ This would provide a success or fail message,
+ but this method can be called asynchronously
+ since LocalNode assumes local data is loaded,
+ not network data.
+ """
+ self.cdiBackupDir = os.path.dirname(path)
+ assert self.cdi is not None, \
+ ("PIP.CONFIGURATION_DESCRIPTION_INFORMATION is not in pipSet"
+ f" for LocalNode {self.id}")
+ if memo is None:
+ memo = MemoryReadMemo(
+ self.id, 0, MemorySpace.CDI.value, 0,
+ self.onCDILoadFailed, self.onCDILoaded)
+ self.cdi.load(self.id, path, MemorySpace.CDI, memo, data=xml_data)
+ # with open(path, "r") as stream:
+ # data = stream.read()
+ # self.tree = etree.fromstring(data)
+ self.reserveSpaces()
+
+ def setMemory(self, memo: CDIMemo, var: CDIVar):
+ """Set a memory address at memo to the value in var"""
+ assert memo.space is not None
+ size = memo.getSize()
+ assert size is not None
+ assert size > 0, f"size={repr(size)}"
+ assert memo.address is not None
+ # if var is None:
+ # var = memo.toCDIVar()
+ assert var is not None
+ assert var.data is not None
+ assert len(var.data) == memo.getSize()
+ self.setMemoryAt(memo.space, memo.address, var.data)
+
+ def setMemoryAt(self, space, address, data):
+ """Set address in virtual memory space to data"""
+ assert isinstance(data, (bytearray, bytes))
+ if isinstance(space, MemorySpace):
+ space = space.value
+ assert isinstance(space, int)
+ assert isinstance(address, int)
+ assert address >= 0
+ size = len(data)
+ end = address + size
+
+ if space not in self.spaces:
+ self.spaces[space] = bytearray()
+ else:
+ assert isinstance(self.spaces[space], bytearray)
+
+ newRegionLen = end - len(self.spaces[space])
+ if newRegionLen > 0:
+ logger.warning(
+ f"Extending LocalNode data from {len(self.spaces[space])}"
+ f" byte(s) to {end} byte(s).")
+ self.spaces[space] += b'\0' * newRegionLen
+ assert end - address == len(data)
+ self.spaces[space][address:end] = data
+ print(f"Set LocalNode {self.id} space {space}"
+ f" address {space} (length {len(data)}).")
+
+ def reserveSpaces(self, parent: Union[CDIMemo, None] = None):
+
+ assert self.cdi is not None, \
+ ("PIP.CONFIGURATION_DESCRIPTION_INFORMATION is not in pipSet"
+ f" for LocalNode {self.id}")
+
+ if parent is None:
+ parent = self.cdi.getRootMemo()
+ assert parent is not None
+ assert parent.tag == "cdi", f"Expected cdi, got {parent.tag}"
+ assert parent.element is not None
+ if (('replicated' in parent.element.attrib)
+ and (parent.element.attrib['replicated'] == "true")):
+ # Caller already used replicatedTree
+ return self._reserveSpaces(parent=parent)
+ replicated_root_memo, replicated_root = self.cdi.replicatedTree()
+ self.cdi.replicated_root_memo = replicated_root_memo
+ self.cdi.replicated_root = replicated_root
+ # ^ self.cdi.replicated_root_memo can also be set via
+ # self.cdi.extractCDIVarMemos.
+ return self._reserveSpaces(
+ parent=self.cdi.replicated_root_memo,
+ )
+
+ def _reserveSpaces(self, parent: Union[CDIMemo, None] = None, level=0):
+ assert parent is not None
+ assert parent.tag is not None
+ tag = parent.tag.lower()
+ if tag == "cdi":
+ assert parent.element is not None
+ assert parent.element.attrib['replicated'] == "true", \
+ "replicated_root_memo accounting for replication must be used."
+ if tag in ("int", "float"): # CLASSNAME_TYPES:
+ # cast_fn = int if tag == "int" else float
+ var = parent.toCDIVar()
+ # _min = parent.getChildContentN("min", tag)
+ # _max = parent.getChildContentN("max", tag)
+ value = parent.getChildContentN("default", tag)
+ # size = parent.getSize()
+ # assert size is not None
+ # var = CDIVar(parent.tag, _min=_min, _max=_max, _size=size)
+ if value is not None:
+ if tag == "float":
+ var.setFloat(value)
+ elif tag == "int":
+ assert isinstance(value, int), \
+ f"tried to use {emit_cast(value)} for int tag"
+ var.setInt(value)
+ assert var.data is not None
+ # var.default = bytearray(copy.deepcopy(var.data))
+ assert parent.space is not None, \
+ f"No space defined in CDI for a(n) {tag}"
+ self.setMemory(parent, var)
+ return
+ for child in parent.children:
+ self._reserveSpaces(child, level=level+1)
+ if level == 0:
+ if not self.cdiBackupDir:
+ logger.warning(
+ f"Not backing up virtual node {self.id} memory since"
+ " no cdiBackupDir is set for the LocalNode instance.")
+ return
+ if not os.path.isdir(self.cdiBackupDir):
+ logger.warning(
+ f"Creating cdiBackupDir {self.cdiBackupDir}")
+ os.makedirs(self.cdiBackupDir)
+ for space, data in self.spaces.items():
+ name = f"{self.id}.lcc-link-virtual-node.space={space}.xml"
+ path = os.path.join(self.cdiBackupDir, name)
+ with open(path, "wb") as stream:
+ stream.write(data)
+ print(f"Wrote {d_quote(path)}")
+
+ def onCDILoaded(self, memo: MemoryReadMemo):
+ """Default handler, typically enough since CDI is local
+ in the case of LocalNode"""
+ assert self.cdi is not None, \
+ ("PIP.CONFIGURATION_DESCRIPTION_INFORMATION is not in pipSet"
+ f" for LocalNode {self.id}")
+ print(f"LocalNode onFileLoaded {self.cdi.getPath()}: {memo}")
+
+ def onCDILoadFailed(self, memo: MemoryReadMemo):
+ """Default handler for file load failed.
+ Shouldn't happen unless application has provided malformed XML,
+ since CDI is local in the case of LocalNode
+ """
+ assert self.cdi is not None, \
+ ("PIP.CONFIGURATION_DESCRIPTION_INFORMATION is not in pipSet"
+ f" for LocalNode {self.id}")
+ print(f"LocalNode onCDILoadFailed {self.cdi.getPath()}: {memo}")
diff --git a/openlcb/memoryservice.py b/openlcb/memoryservice.py
index f683d18..997706b 100644
--- a/openlcb/memoryservice.py
+++ b/openlcb/memoryservice.py
@@ -25,35 +25,81 @@
from logging import getLogger
from typing import (
Callable,
- List, # in case list doesn't support `[` in this Python version
+ List,
+ Optional, # in case list doesn't support `[` in this Python version
Union, # in case `|` doesn't support 'type' in this Python version
)
+from openlcb import (
+ emit_cast,
+)
from openlcb.datagramservice import (
# DatagramReadMemo,
DatagramReadMemo,
DatagramWriteMemo,
DatagramService,
)
+from openlcb.convert import Convert
+from openlcb.nodeid import NodeID
logger = getLogger(__name__)
+MODE_BYTES = {
+ 'Read_Command': {0x40, 0x41, 0x42, 0x43},
+ 'Read_Reply': {0x50, 0x51, 0x52, 0x53},
+ 'Read_Stream_Command': {0x60, 0x61, 0x62, 0x63},
+ 'Read_Stream_Reply': {0x70, 0x71, 0x72, 0x73},
+ 'Write_Command': {0x00, 0x01, 0x02, 0x03},
+ 'Write_Reply': {0x10, 0x11, 0x12, 0x13},
+ 'Write_Under_Mask_Command': {0x08, 0x09, 0x0A, 0x0B},
+ 'Write_Stream_Command': {0x20, 0x21, 0x22, 0x23},
+ 'Write_Stream_Reply': {0x30, 0x31, 0x32, 0x33},
+}
+
+MODE_ERROR_BYTES = {
+ 'Read_Reply': {0x58, 0x59, 0x5A, 0x5B},
+ 'Read_Stream_Reply': {0x78, 0x79, 0x7A, 0x7B},
+ 'Write_Reply': {0x18, 0x19, 0x1A, 0x1B},
+ 'Write_Stream_Reply': {0x38, 0x39, 0x3A, 0x3B},
+}
+
+
class MemorySpace(Enum):
"""The memory space to read.
In practice, XMLDataProcessor (or a non-XML parser if necessary)
uses this to track what data type and format is to be assumed in a
received Message. It is assumed to have the same space as the
request (MemoryReadMemo).
+ - A datagram's `space` attribute's type should be `int` not
+ MemorySpace, because CDI specifies variables' space arbitrarily.
Attributes:
Uninitialized: No data (memory read request response) is expected.
CDI: The data expected from the memory read is CDI XML.
FDI: The data expected from the memory read is FDI XML.
+ All: All memory of the device, where all is defined by its designer
+ (See OpenLCB Memory Configuration Standard 4.2).
+ Configuration: A writeable basic configuration space, with
+ the structure of the 32-bit space defined by the designer
+ (See OpenLCB Memory Configuration Standard 4.2).
"""
Uninitialized = -1
CDI = 0xFF # decodes to 0x03
FDI = 0xFA
+ All = 0xFE
+ Configuration = 0xFD
+
+ @classmethod
+ def fromNumber(cls, num: int):
+ """Return the MemorySpace member with the given numeric value,
+ or None if no match is found.
+ """
+ assert isinstance(num, int)
+ for member in cls:
+ if member.value == num:
+ return member
+ return None
class MemoryReadMemo:
@@ -82,8 +128,12 @@ class MemoryReadMemo:
Attributes:
data(bytearray): The data that was read.
"""
- def __init__(self, nodeID, size, space, address, rejectedReply, dataReply):
+ def __init__(self, nodeID: NodeID, size: int, space: int, address: int,
+ rejectedReply: Callable[['MemoryReadMemo'], None],
+ dataReply: Callable[['MemoryReadMemo'], None]):
# For args see class docstring.
+ self.error = None # type: str|None
+ self.errorCode = None # type: int|None
self.nodeID = nodeID
self.size = size
self.space = space
@@ -93,6 +143,7 @@ def __init__(self, nodeID, size, space, address, rejectedReply, dataReply):
# for convenience, data can be added or updated after creation of the
# memo
self.data = bytearray()
+ assertMemoOK(self)
class MemoryWriteMemo:
@@ -115,9 +166,13 @@ class MemoryWriteMemo:
memory address.
"""
- def __init__(self, nodeID, okReply, rejectedReply, size, space, address,
- data):
+ def __init__(self, nodeID: NodeID,
+ okReply: Callable[['MemoryWriteMemo'], None],
+ rejectedReply: Callable[['MemoryWriteMemo'], None],
+ size: int, space: int, address: int, data: bytearray):
# For args see class docstring.
+ self.error = None # type: str|None
+ self.errorCode = None # type: int|None
self.nodeID = nodeID
self.okReply = okReply
self.rejectedReply = rejectedReply
@@ -125,6 +180,86 @@ def __init__(self, nodeID, okReply, rejectedReply, size, space, address,
self.space = space
self.address = address
self.data = data
+ assertMemoOK(self)
+
+
+def assertMemoOK(memo: Union[MemoryReadMemo, MemoryWriteMemo]):
+ assert isinstance(memo.space, int), \
+ f"Expected int or MemorySpace.value, got space={emit_cast(memo.space)}"
+ assert isinstance(memo.size, int), \
+ f"Expected int, got size={emit_cast(memo.size)}"
+ # TODO: > 64 is only ok for a length request (?)
+ # assert memo.size <= 64, \
+ # f"Expected <= 64, got size={memo.size}"
+ assert isinstance(memo.address, int), \
+ f"Expected int, got address={emit_cast(memo.address)}"
+ assert len(memo.data) <= 64
+ assert isinstance(memo.data, (bytes, bytearray)), \
+ f"Expected bytearray, got data={emit_cast(memo.data)}"
+
+
+def parseReplyDatagram(memo: Union[MemoryReadMemo, MemoryWriteMemo],
+ dmemo: Union[DatagramReadMemo, DatagramWriteMemo]):
+ """Parse dmemo and set errorCode and/or error attributes of memo"""
+ if not dmemo.data or dmemo.data[0] != 0x20:
+ logger.warning(
+ "Datagram type is not memory configuration (0x20)"
+ f" it is {hex(dmemo.data[0])}")
+ return
+ if len(dmemo.data) < 2:
+ logger.warning(
+ "Datagram is truncated to 1 byte:"
+ f" it is {hex(dmemo.data[0])}")
+ return
+ (hasByte6, _) = Convert.deserializeMC2ndByte(dmemo.data[1])
+ offset = 6
+ error = None
+ if hasByte6:
+ offset = 7
+ memo.error = None
+ memo.errorCode = None
+ if (dmemo.data[1] & 0x08 == 0):
+ # ok reply
+ return
+ else:
+ pass
+ # 0x08 (0b00001000) is error bit
+ # mode = None
+ # for k, values in MODE_ERROR_BYTES.items():
+ # if dmemo.data[1] in values:
+ # mode = k
+ # break
+ # if mode is not None:
+ # error = f"No {mode} error code."
+ # else:
+ # error = f"No error code for unknown mode {hex(dmemo.data[1])}."
+ code_idx = offset
+
+ if len(dmemo.data) < code_idx + 2:
+ memo.error = error
+ if len(dmemo.data) == code_idx + 1:
+ memo.error = (
+ f"malformed error code {hex(dmemo.data[code_idx])}"
+ " (expected 2 bytes)")
+ memo.errorCode = dmemo.data[code_idx]
+ return
+ error = None
+ # Decode big-endian number:
+ memo.errorCode = (dmemo.data[code_idx] << 8) + dmemo.data[code_idx+1]
+ message_idx = code_idx + 2
+ if len(dmemo.data) > message_idx:
+ error_bytes = Convert.getBeforeNull(dmemo.data, message_idx)
+ error = error_bytes.decode("utf-8")
+ if len(error) == 1:
+ error += f" ({hex(dmemo.data[message_idx])})"
+ elif len(error) == 0 and (len(dmemo.data) - message_idx > 0):
+ error += f" ({list(dmemo.data[message_idx:])})"
+ else:
+ error = f"(2nd byte = {hex(dmemo.data[1])})"
+ error += f" (hasByte6={hasByte6})"
+ if hasByte6:
+ error += f" (space={hex(dmemo.data[6])})"
+ memo.error = error
class MemoryService:
@@ -146,31 +281,8 @@ def __init__(self, service: DatagramService):
self.datagramReceivedListener
)
- def spaceDecode(self, space):
- """Convert from a space number to either
- False and command byte or True and standard memory space
-
- Args:
- space (int): Encoded memory space identifier, where values:
- - 0xFF to 0xFD are special spaces, and only the least significant
- 2 bits are relevant.
- - 0x00 to 0xFC represent standard memory spaces directly.
-
- Returns:
- tuple(bool, byte): (False, 1-3 for in command byte) :
- spaces 0xFF - 0xFD
- or (True, space number) : spaces 0 - 0xFC
- (NOTE: type of space may affect type of output)
- """
- # TODO: Maybe check type of space & raise TypeError if not
- # something valid, whether byte, int, or what is ok [add
- # more _description_ to space in docstring].
- if space >= 0xFD:
- return (False, space & 0x03)
- return (True, space)
-
- def requestMemoryRead(self, memo):
- # type: (MemoryReadMemo) -> None
+ def requestMemoryRead(self, memo, stream: bool = False):
+ # type: (MemoryReadMemo, Optional[bool]) -> None
'''Request a read operation start.
- If okReply in the memo is triggered, it will be followed by a
@@ -181,23 +293,31 @@ def requestMemoryRead(self, memo):
Args:
memo (MemoryReadMemo): Request to enqueue.
'''
+ assert isinstance(stream, bool)
# preserve the request
self.readMemos.append(memo)
if len(self.readMemos) == 1:
- self.requestMemoryReadNext(memo)
+ self.requestMemoryReadNext(memo, stream=stream)
- def requestMemoryReadNext(self, memo):
- # type: (MemoryReadMemo) -> None
+ def requestMemoryReadNext(self, memo, stream: bool = False):
+ # type: (MemoryReadMemo, Optional[bool]) -> None
"""send the read request
Args:
memo (MemoryReadMemo): Request to send.
"""
- byte6 = False
+ assert isinstance(stream, bool)
+ hasByte6 = False # if custom space is defined in byte 6
flag = 0
- (byte6, flag) = self.spaceDecode(memo.space)
- spaceFlag = 0x40 if byte6 else (flag | 0x40)
+ (hasByte6, flag) = Convert.serializeSpace(memo.space)
+ if stream:
+ # Encoding: 0x60=custom, 0x61=0xFD, 0x62=0xFE, 0x63=0xFF
+ spaceFlag = 0x60 if hasByte6 else (flag | 0x60)
+ else:
+ # Encoding: 0x40=custom, 0x41=0xFD, 0x42=0xFE, 0x43=0xFF
+ spaceFlag = 0x40 if hasByte6 else (flag | 0x40) # | 0b11111100
+ # ^ In else case, flag is 1-3, so re-add 0xFC (0b11111100)
addr2 = ((memo.address >> 24) & 0xFF)
addr3 = ((memo.address >> 16) & 0xFF)
addr4 = ((memo.address >> 8) & 0xFF)
@@ -206,7 +326,8 @@ def requestMemoryReadNext(self, memo):
DatagramService.ProtocolID.MemoryOperation.value, spaceFlag,
addr2, addr3, addr4, addr5])
# NOTE: list[int] is ok for bytearray extend (`+` requires cast)
- if byte6:
+ if hasByte6:
+ assert memo.space <= 0xFF, f"Space {memo.space} out of byte range"
data.extend([(memo.space & 0xFF)])
data.extend([memo.size])
logger.debug(
@@ -263,6 +384,7 @@ def datagramReceivedListener(self, dmemo: DatagramReadMemo) -> bool:
if len(self.readMemos) > 0:
self.requestMemoryReadNext(self.readMemos[0])
+ parseReplyDatagram(tMemoryMemo, dmemo)
# fill data for call-back to requestor
if len(dmemo.data) > offset:
tMemoryMemo.data = dmemo.data[offset:]
@@ -286,6 +408,7 @@ def datagramReceivedListener(self, dmemo: DatagramReadMemo) -> bool:
if self.writeMemos[index].nodeID == dmemo.srcID:
writeMemo = self.writeMemos[index] # type: MemoryWriteMemo
del self.writeMemos[index]
+ parseReplyDatagram(writeMemo, dmemo)
if dmemo.data[1] & 0x08 == 0 :
writeMemo.okReply(writeMemo)
else:
@@ -314,19 +437,26 @@ def datagramReceivedListener(self, dmemo: DatagramReadMemo) -> bool:
return True
- def requestMemoryWrite(self, memo: MemoryWriteMemo):
+ def requestMemoryWrite(self, memo: MemoryWriteMemo, stream: bool = False):
+ # type: (MemoryWriteMemo, Optional[bool]) -> None
"""Request memory write.
Args:
memo (MemoryWriteMemo): information to send
"""
+ assert isinstance(stream, bool)
# preserve the request
self.writeMemos.append(memo)
# create & send a write datagram
- byte6 = False
+ hasByte6 = False # if custom space is defined in byte 6
flag = 0
- (byte6, flag) = self.spaceDecode(memo.space)
- spaceFlag = 0x00 if byte6 else (flag | 0x00)
+ (hasByte6, flag) = Convert.serializeSpace(memo.space)
+ if stream:
+ # Encoding: 0x20=custom, 0x21=0xFD, 0x22=0xFE, 0x23=0xFF
+ spaceFlag = 0x20 if hasByte6 else (flag | 0x20)
+ else:
+ # Encoding: 0x00=custom, 0x01=0xFD, 0x02=0xFE, 0x03=0xFF
+ spaceFlag = 0x00 if hasByte6 else (flag | 0x00)
addr2 = ((memo.address >> 24) & 0xFF)
addr3 = ((memo.address >> 16) & 0xFF)
addr4 = ((memo.address >> 8) & 0xFF)
@@ -335,19 +465,21 @@ def requestMemoryWrite(self, memo: MemoryWriteMemo):
DatagramService.ProtocolID.MemoryOperation.value, spaceFlag,
addr2, addr3, addr4, addr5
])
- if byte6:
+ if hasByte6:
+ assert memo.space <= 0xFF, f"Space {memo.space} out of byte range"
data.extend([(memo.space & 0xFF)])
data.extend(memo.data)
dgWriteMemo = DatagramWriteMemo(memo.nodeID, data)
self.service.sendDatagram(dgWriteMemo)
- def requestSpaceLength(self, space, nodeID, callback):
+ def requestSpaceLength(self, space: int, nodeID: NodeID,
+ callback: Callable[[int], None]):
'''Request the length of a specific memory space from a remote node.
Args:
space (int): Encoded memory space identifier. This can be a
value within a specific range, as defined in the
- `spaceDecode` method.
+ `serializeSpace` method.
nodeID (NodeID): ID of remote node from which the memory
space length is requested.
callback (Callable): Callback function that will receive the
@@ -372,123 +504,3 @@ def requestSpaceLength(self, space, nodeID, callback):
])
)
self.service.sendDatagram(dgReqMemo)
-
- def arrayToInt(self, data: Union[bytes, bytearray, List[int]]) -> int:
- """Convert an array in MSB-first order to an integer
-
- Args:
- data (Union[bytes,bytearray,list[int]]): MSB-first order
- encoded 32-bit int
-
- Returns:
- int: The converted data as a number.
- """
- result = 0
- for index in range(0, len(data)):
- result = result << 8
- result = result | data[index]
- return result
-
- def arrayToUInt64(self, data):
- """Parse a MSB-first order 64-bit integer
- (Python auto-sizes int, so this is same as arrayToInt).
- """
- return self.arrayToInt(data)
-
- @staticmethod
- def arrayToString(data, length):
- """Decode utf-8 bytes to string
- up to the 1st zero byte or given length,
- whichever is fewer characters.
-
- Args:
- data (Union[bytearray, bytes]): A string encoded as bytes.
- length (int): The used length the data.
-
- Returns:
- str: Data decoded as text.
- """
- if not isinstance(data, bytearray):
- raise TypeError("Expected bytearray (formerly list[int]), got {}"
- .format(type(data).__name__))
- zeroIndex = len(data)
- try:
- temp = data.index(0)
- zeroIndex = temp
- except KeyboardInterrupt:
- raise
- except:
- pass
-
- byteCount = min(zeroIndex, length)
-
- if byteCount == 0:
- return ""
-
- result = data[:byteCount].decode('utf-8')
- return result
-
- @staticmethod
- def intToArray(value, length):
- """Convert an integer into an array of given length
-
- Args:
- value (int): any value
- length (int): Byte count (1, 2, 4, or 8).
-
- Returns:
- bytearray: The value encoded in big-endian format.
- """
- if value >= (1 << (length * 8)): # TODO: ? also exclude value < 0 ?
- raise ValueError("Value {} cannot fit in {} bytes."
- .format(value, length))
- if length == 1:
- return bytearray([
- (value & 0xff)
- ])
- if length == 2:
- return bytearray([
- ((value >> 8) & 0xff), (value & 0xff)
- ])
- if length == 4:
- return bytearray([
- ((value >> 24) & 0xff), ((value >> 16) & 0xff),
- ((value >> 8) & 0xff), (value & 0xff)
- ])
- if length == 8:
- return bytearray([
- ((value >> 56) & 0xff), ((value >> 48) & 0xff),
- ((value >> 40) & 0xff), ((value >> 32) & 0xff),
- ((value >> 24) & 0xff), ((value >> 16) & 0xff),
- ((value >> 8) & 0xff), (value & 0xff)
- ])
- logger.error("integer length {} is not implemented.".format(length))
- return bytearray()
-
- @staticmethod
- def uInt64ToArray(value, length):
- '''Convert a 64-bit integer into an array of given length
- (Python auto-sizes int, so this is same as intToArray)
- '''
- return MemoryService.intToArray(value, length)
-
- @staticmethod
- def stringToArray(value, length):
- '''Converts a string to an array of given length
- padding with 0 bytes as needed
- '''
- strToUInt8 = value.encode('utf-8')
- byteCount = min(length, len(strToUInt8))
- # convert to bytearray since bytes is immutable:
- contentPart = bytearray(strToUInt8[:byteCount])
- if len(contentPart) >= length:
- if len(contentPart) > length:
- logger.warning(
- "MemoryService stringToArray: len(value)=={}"
- " exceeds length {}".format(len(value), length))
- # TODO: Truncate (or is any length ok for the caller)?
- return contentPart
- # list[int] is compatible bytearray extend but not `+` so cast
- # to bytearray after getting list[int] of remaining length:
- padding = bytearray([0] * (length-len(contentPart)))
- return contentPart + padding
diff --git a/openlcb/nodestore.py b/openlcb/nodestore.py
index cb423ba..498e8b5 100644
--- a/openlcb/nodestore.py
+++ b/openlcb/nodestore.py
@@ -38,6 +38,10 @@ def store(self, node: Node) :
self.nodes.sort(key=lambda x: x.snip.userProvidedNodeName,
reverse=True)
+ def clear(self):
+ self.byIdMap.clear()
+ self.nodes.clear()
+
def isPresent(self, nodeID: NodeID) -> bool:
return self.byIdMap.get(nodeID) is not None
diff --git a/openlcb/openlcbnetwork.py b/openlcb/openlcbnetwork.py
index 28bd45a..33f2806 100644
--- a/openlcb/openlcbnetwork.py
+++ b/openlcb/openlcbnetwork.py
@@ -28,6 +28,7 @@
from openlcb.cdimemo import CDIMemo
from openlcb.datagramservice import DatagramReadMemo, DatagramService
from openlcb.dataprocessor import DataFormat
+from openlcb.dataprocessormemo import DataProcessorMemo
from openlcb.memoryservice import MemoryReadMemo, MemoryService, MemorySpace
from openlcb.message import Message
from openlcb.xmldataprocessor import XMLDataProcessor
@@ -54,7 +55,7 @@ class OpenLCBNetwork:
is a MemorySpace)
"""
def __init__(self, localNodeID: Union[str, bytearray, int, NodeID]):
- self._onConnect: Union[Callable[[CDIMemo], None], None] = None
+ self._onConnect: Union[Callable[[DataProcessorMemo], None], None] = None
self._port: PortInterface = None
self.physicalLayer: CanPhysicalLayerGridConnect = None
self.canLink: CanLink = None
@@ -93,6 +94,10 @@ def __init__(self, localNodeID: Union[str, bytearray, int, NodeID]):
self._memoryService = MemoryService(self._datagramService)
self._dataProcessor: XMLDataProcessor = None
+ @property
+ def memoryService(self):
+ return self._memoryService
+
def setConnectHandler(self, handler: Callable[[CDIMemo], None]):
"""Deprecated in favor of a Message handler,
Since it is the socket loop's responsibility to call
@@ -152,6 +157,7 @@ def _startMemoryRead(self, farNodeID: Union[NodeID, int, str, bytearray]):
"""
# read 64 bytes from the CDI space starting at address zero
assert isinstance(self._dataProcessor.space, MemorySpace)
+ self._dataProcessor.onStartDownload()
memMemo = MemoryReadMemo(NodeID(farNodeID), 64,
self._dataProcessor.space.value,
0, # incremented on _memoryReadSuccess
@@ -189,7 +195,6 @@ def download(self, farNodeID: str, space: MemorySpace,
assert isinstance(space, MemorySpace)
self._dataProcessor = dataProcessor
self._dataProcessor._space = space
- self._dataProcessor._stringTerminated = False
self._startMemoryRead(farNodeID)
# ^ Following this, _memoryReadSuccess callback will
@@ -285,23 +290,28 @@ def _listen(self):
# manually.
# - Usually "socket connection broken" due to no more
# bytes to read, but ok if "\0" terminator was reached.
- if ((self._dataProcessor._data is not None)
- and (not self._dataProcessor._stringTerminated)):
- # This boolean is managed by the memoryReadSuccess
- # callback.
- cm = CDIMemo()
- cm.error = formatted_ex(ex)
- cm.done = True # stop progress in gui/other main thread
- if self._dataProcessor._onElement:
- self._dataProcessor._onElement(cm)
- raise # re-raise since incomplete (prevent done OK state)
+ if self._dataProcessor is not None:
+ if ((self._dataProcessor._data is not None)
+ and (not self._dataProcessor._stringTerminated)):
+ # This boolean is managed by the memoryReadSuccess
+ # callback.
+ cm = DataProcessorMemo()
+ cm.error = formatted_ex(ex)
+ cm.done = True # stop progress in gui/other main thread
+ if self._dataProcessor.onStatusMemo:
+ self._dataProcessor.onStatusMemo(cm)
+ raise # re-raise since incomplete (prevent done OK state)
+ else:
+ logger.warning(
+ "Listen loop ended, but _dataProcessor not set"
+ " (DataProcessorMemo will not be used to notify caller).")
finally:
self.physicalLayer.physicalLayerDown() # Link_Layer_Down, setState
self._listenThread: Union[threading.Thread, None] = None
# If we got here, the RuntimeError was ok since the
# null terminator '\0' was reached (otherwise re-raise occurs above)
- cm = CDIMemo()
+ cm = DataProcessorMemo()
cm.error = ("Listen loop stopped (caught_ex={})."
.format(formatted_ex(caught_ex)))
cm.done = True
@@ -332,7 +342,7 @@ def _handleMessage(self, message: Message):
logger.debug("[_handleMessage] message.mti={}".format(message.mti))
if message.mti == MTI.Link_Layer_Down:
if self._onConnect:
- cm = CDIMemo()
+ cm = DataProcessorMemo()
cm.done = True
cm.error = "Disconnected"
cm.message = message
@@ -341,7 +351,7 @@ def _handleMessage(self, message: Message):
return True
elif message.mti == MTI.Link_Layer_Up:
if self._onConnect:
- cm = CDIMemo()
+ cm = DataProcessorMemo()
cm.done = True # 'done' without error indicates connected.
cm.message = message
self._onConnect(cm)
@@ -359,7 +369,7 @@ def _fireStatus(self, status,
else:
logger.warning("No callback, but set status: {}".format(status))
- def _memoryReadSuccess(self, memo: MemoryReadMemo):
+ def _memoryReadSuccess(self, memo: MemoryReadMemo, force_end=False):
"""Handle a successful read
Invoked when the memory read successfully returns,
this queues a new read until the entire CDI has been
@@ -369,7 +379,8 @@ def _memoryReadSuccess(self, memo: MemoryReadMemo):
memo (MemoryReadMemo): Successful MemoryReadMemo
"""
# print("successful memory read: {}".format(memo.data))
- if len(memo.data) == 64 and 0 not in memo.data: # *not* last chunk
+ if (not force_end) and (len(memo.data) == 64 and 0 not in memo.data):
+ # *not* last chunk
self._dataProcessor._stringTerminated = False
if self._dataProcessor.format != DataFormat.EOF:
# save content
@@ -401,7 +412,7 @@ def _memoryReadFail(self, memo: MemoryReadMemo):
if len(self._dataProcessor._tag_stack):
cm = self._dataProcessor._tag_stack[-1]
else:
- cm = CDIMemo()
+ cm = DataProcessorMemo()
cm.error = error
cm.done = True # stop progress in gui/other main thread
self._dataProcessor._onElement(cm)
diff --git a/openlcb/platformextras.py b/openlcb/platformextras.py
index 8cb6b61..142e881 100644
--- a/openlcb/platformextras.py
+++ b/openlcb/platformextras.py
@@ -53,6 +53,7 @@ def clean_file_name_char(c: str, placeholder: Union[str, None] = None) -> str:
def clean_file_name(name: str, placeholder: Union[str, None] = None) -> str:
+ assert name is not None
assert isinstance(name, str)
if (os.path.sep in name) or ("/" in name):
# or "/" since Python uses that even on Windows
diff --git a/openlcb/tcplink/mdnsconventions.py b/openlcb/tcplink/mdnsconventions.py
index 6cf4380..ff70caf 100644
--- a/openlcb/tcplink/mdnsconventions.py
+++ b/openlcb/tcplink/mdnsconventions.py
@@ -1,4 +1,5 @@
from logging import getLogger
+from typing import Union
from openlcb import only_hex_pairs
from openlcb.conventions import hex_to_dotted_lcc_id
@@ -7,7 +8,7 @@
logger = getLogger(__name__)
-def id_from_tcp_service_name(service_name):
+def id_from_tcp_service_name(service_name) -> Union[str, None]:
"""Scrape an MDNS TCP service name, assuming it uses conventions
(`"{org}_{model}_{id}._openlcb-can.{protocol}.{tld}".format(...)`
where:
diff --git a/openlcb/xmldataprocessor.py b/openlcb/xmldataprocessor.py
index 539308a..541a048 100644
--- a/openlcb/xmldataprocessor.py
+++ b/openlcb/xmldataprocessor.py
@@ -1,17 +1,18 @@
from collections import OrderedDict
+import copy
import os
import xml.sax # noqa: E402
import xml.sax.handler
import xml.etree.ElementTree as ET
from logging import getLogger
-from typing import Callable, List, Union
-# from xml.sax.xmlreader import AttributesImpl # for type hints, for autocomplete only in this case
+from typing import Callable, List, Tuple, Union
+# from xml.sax.xmlreader import AttributesImpl # for type hints, for autocomplete only in this case # noqa:E501
import xml.sax.xmlreader # for type hints, for autocomplete only in this case
-from openlcb import emit_cast
+from openlcb import d_quote, emit_cast
from openlcb.canbus.canlink import CanLink
-from openlcb.cdimemo import CDIMemo
+from openlcb.cdimemo import CDIMemo, DataProcessorMemo
from openlcb.dataprocessor import DataFormat, DataProcessor
from openlcb.nodeid import NodeID
from openlcb.platformextras import (
@@ -23,6 +24,10 @@
MemorySpace,
)
# from openlcb.remotenodeprocessor import RemoteNodeProcessor
+from openlcb.cdivar import (
+ CDIVar,
+ CLASSNAME_TYPES,
+)
if __name__ == "__main__":
@@ -59,13 +64,15 @@ def attrs_to_ordered(attrs: xml.sax.xmlreader.AttributesImpl):
return od
-def format_of_space(space):
+def format_of_space(space, unknown_raises=True):
assert isinstance(space, MemorySpace)
if space == MemorySpace.CDI:
return DataFormat.XML
elif space == MemorySpace.FDI:
return DataFormat.XML
- raise NotImplementedError(emit_cast(space))
+ if unknown_raises:
+ raise NotImplementedError(emit_cast(space))
+ return None
class XMLDataProcessor(xml.sax.handler.ContentHandler, DataProcessor):
@@ -81,6 +88,12 @@ class XMLDataProcessor(xml.sax.handler.ContentHandler, DataProcessor):
_openEl (SubElement): Tracks currently-open tag (no ``
yet) during parsing, or if no tags are open then equals
etree.
+ _ended_memo (CDIMemo|None): The memo most recently popped,
+ where "tail" (text after end tag) should be set during
+ "characters" when a new tag hasn't been started yet.
+ TODO: Put child element's tail in parent (Not part of
+ Standard as of 2026-05, but technically possible, such
+ as "World" in `Hello
World`).
_tag_stack (list[SubElement]): Tracks scope during parse since
self.etree doesn't have awareness of whether end tag is
finished (and therefore doesn't know which element is the
@@ -95,20 +108,28 @@ class XMLDataProcessor(xml.sax.handler.ContentHandler, DataProcessor):
_space (int): Space containing the CDI itself (not data
described by CDI).
_tmp_space (int|None): What space we are currently on
- (of data described by Element(s), not of XML data itself)
- _tmp_address (int|None): Where we are in the memory space
- (starting at origin, and calculated using offset and/or size
- of start tags).
+ (of data described by Element(s), not of XML data itself).
+ _tmp_address (int|None): For sanity check, not actual address
+ (no replication)! See replicatedTree docstring.
"""
XML_TOP_TAGS = ("cdi", "fdi")
+ DEFAULT_EXT = ".cdi.xml" # override in subclass
+ DEFAULT_CACHES_DIR = SysDirs.Cache
+ DEFAULT_CACHE_DIR = os.path.join(DEFAULT_CACHES_DIR, "python-openlcb")
def __init__(self, linkLayer: CanLink, space: MemorySpace):
self.canLink: CanLink = linkLayer
- caches_dir = SysDirs.Cache
+ # caches_dir = SysDirs.Cache
+ self.replicated_root = None # type: ET.Element|None
+ self.replicated_root_memo = None # type: CDIMemo|None
+ self._root_memos = None # type: list[CDIMemo]|None
+ self._root_memo = None # type: CDIMemo|None
self._space: Union[MemorySpace, None] = None
self._openEl: Union[ET.Element, None] = None
self._top_tag = "cdi" # cdi or fdi (detected in startElement)
- self._myCacheDir = os.path.join(caches_dir, "python-openlcb")
+ # self._myCacheDir = os.path.join(caches_dir, "python-openlcb")
+ self._ended_memo = None # type: CDIMemo|None
+ self._myCacheDir = XMLDataProcessor.DEFAULT_CACHE_DIR
self._tmp_space = None # type: int|None
self._tmp_address = None # type: int|None
assert isinstance(space, MemorySpace)
@@ -118,6 +139,7 @@ def __init__(self, linkLayer: CanLink, space: MemorySpace):
# prepare these for _callback_msg.
xml.sax.ContentHandler.__init__(self)
DataProcessor.__init__(self)
+ self.enable_cache = True
self._stringTerminated = None # type: Union[bool, None]
# ^ None means no read is occurring.
if self._format != DataFormat.XML:
@@ -136,6 +158,42 @@ def __init__(self, linkLayer: CanLink, space: MemorySpace):
# endregion ContentHandler
self.acdi = False
+ def getRootMemo(self) -> Union[CDIMemo, None]:
+ """Get the root memo object if any.
+ This should only be called after the entire file is parsed such
+ as when cm.done is True in onStatusMemo(cm) callback. Set
+ callback manually if necessary and if using realtime parsing
+ (_feed) mode.
+ """
+ if not self._root_memos:
+ return None
+ if len(self._root_memos) > 1:
+ summaries = []
+ cdi_roots = []
+ tag = None
+ for memo in self._root_memos:
+ tag = memo.getTag()
+ if tag is not None:
+ tag = tag.lower()
+ summaries.append(memo.getTag())
+ if tag in ("cdi", "fdi"):
+ cdi_roots.append(memo)
+ if len(cdi_roots) == 1:
+ return cdi_roots[0]
+ if tag not in ("cdi", "fdi"):
+ logger.warning(
+ f"Got more than one XML root: {summaries};"
+ " expected cdi/fdi")
+ else:
+ logger.warning(f"Got more than one XML root: {summaries}")
+ return self._root_memos[-1]
+ tag = self._root_memos[0].getTag()
+ if tag is not None:
+ tag = tag.lower()
+ if tag not in ("cdi", "fdi"):
+ logger.warning(f"Only XML root is {repr(tag)} not cdi/fdi")
+ return self._root_memos[0]
+
def setSpace(self, space: MemorySpace):
self._space = space
self._format = format_of_space(space)
@@ -153,12 +211,13 @@ def space(self) -> MemorySpace:
assert isinstance(self._space, MemorySpace)
return self._space
- def onStatusMemo(self, cm: CDIMemo) -> bool:
+ def onStatusMemo(self, cm: DataProcessorMemo) -> bool:
"""Handle memo with status that doesn't affect tag stack/scope.
(Implement in subclass)
Returns:
bool: True if handled.
"""
+ logger.warning("Default onStatusMemo ran.")
return False
def onPushScope(self, cm: CDIMemo) -> bool:
@@ -181,6 +240,15 @@ def onPopScope(self, cm: CDIMemo) -> bool:
"""
return False
+ def onStartDownload(self):
+ """Initialize variables used by element handler(s).
+ If subclass is a GUI, reimplement this to reset GUI,
+ but also call onStart or super().onStartDownload().
+ """
+ self._stringTerminated = False
+ self._resetTree()
+ self.onStart()
+
def onStart(self):
# self._cdi_offset = 0 # Instead see memo.address (which is
# incremented on _memoryReadSuccess or custom memory read
@@ -190,6 +258,9 @@ def onStart(self):
"A previous downloadCDI operation is in progress"
" or failed (Set _data to None first if failed)")
self._data = bytearray()
+ self.progress_count = 0
+ self._root_memos = [] # list of roots
+ self._root_memo = None
def onStop(self):
self._format = DataFormat.EOF # no data expected
@@ -204,11 +275,22 @@ def _fireStatus(self, status,
if callback is None:
callback = self.onStatusMemo
if callback:
- print("OpenLCBNetwork callback_msg({})".format(repr(status)))
+ logger.info("OpenLCBNetwork callback_msg({})".format(repr(status)))
callback(CDIMemo(status=status))
else:
logger.warning("No callback, but set status: {}".format(status))
+ def _fireStatusMemo(self, statusMemo,
+ callback: Union[Callable[[CDIMemo], bool], None] = None): # noqa: E501
+ """Fire status handlers with the given status."""
+ if callback is None:
+ callback = self.onStatusMemo
+ if callback:
+ logger.info(f"OpenLCBNetwork callback_msg({statusMemo})")
+ callback(statusMemo)
+ else:
+ logger.warning(f"No callback, but set status: {statusMemo}")
+
def _feedNext(self, memo: MemoryReadMemo):
"""Handle partial CDI XML (any packet except last)
The last packet is not yet reached, so don't parse (but
@@ -220,17 +302,109 @@ def _feedNext(self, memo: MemoryReadMemo):
"""
assert self._data is not None
self._data += memo.data
+ self.progress_count = len(self._data)
partial_str = memo.data.decode("utf-8")
if self._realtime:
self._parser.feed(partial_str) # may call startElement/endElement
+ cm = DataProcessorMemo()
+ cm.progress_count = self.progress_count
+ cm.expected_size = self.expected_size
+ self.onStatusMemo(cm)
+
+ def load(self, node_id: NodeID, path, space: Union[MemorySpace, int],
+ memo: Union[MemoryReadMemo, None] = None,
+ format: Union[DataFormat, None] = None,
+ data: Union[bytes, bytearray, str, None] = None):
+ """Load instead of downloading.
+ Args:
+ path (str): Location of original xml data (unused if
+ data is specified, but may be used for tracing;
+ Always sets self._path).
+ data (Optional[Union[bytes, bytearray, str]]): Actual XML data,
+ optional if path exists. If None,
+ path will be loaded, otherwise it will not,
+ and data will be used instead.
+ """
+ assert not self._data
+ self._is_from_cache = True
+ self.onStartDownload()
+ assert isinstance(space, (MemorySpace, int))
+ if isinstance(space, int):
+ try_space = MemorySpace.fromNumber(space)
+ if try_space is not None:
+ space = try_space
+ if isinstance(space, MemorySpace):
+ self.setSpace(space)
+ else:
+ if format is None:
+ raise ValueError(f"Using device-specific space: {space}"
+ " but format not specified")
+ else:
+ assert isinstance(format, DataFormat)
+ self._format = format
+ self._space = space # type:ignore # int if device-specific
+ logger.warning(f"Using device-specific space: {space}")
+ if data is None:
+ with open(path, "rb") as stream:
+ data = stream.read() # type:ignore
+ else:
+ if isinstance(data, str):
+ # Mimic network data by converting to bytes:
+ data = data.encode('utf-8')
+ self._path = path
+ if self._format is DataFormat.XML:
+ if memo is not None:
+ assert isinstance(memo, MemoryReadMemo)
+ else:
+ def memoryReadSuccess(memo: MemoryReadMemo):
+ # See further down
+ print("Fallback memoryReadSuccess ran.")
+ pass
+
+ def memoryReadFail(memo: MemoryReadMemo):
+ raise RuntimeError(
+ "Offline parse failure (should never happen)")
+
+ assert data is not None
+ # Based on _startMemoryRead in OpenLCBNetwork:
+ _space = self.getSpaceValue() # self._space is set above
+ assert _space is not None
+ memo = MemoryReadMemo(node_id, len(data),
+ _space, 0,
+ memoryReadFail, memoryReadSuccess)
+
+ assert data is not None
+ memo.data = data # type: ignore
+ self._data = bytearray() # Since _feedLast adds memo.data to it
+ memo.size = len(data)
+ # based on "else" (done) case in _memoryReadSuccess
+ # in OpenLCBNetwork:
+ self._stringTerminated = True
+ self._feedLast(memo, enable_cache=False)
+ self.onStop() # sets self._format to DataFormat.EOF
+ else:
+ logger.warning(f"Custom DataFormat {self._format}"
+ f" (space={space}): not parsed automatically.")
+
+ def getSpaceValue(self):
+ # type: () -> int|None
+ if self._space is None:
+ return None
+ if isinstance(self._space, MemorySpace):
+ return self._space.value
+ assert isinstance(self._space, int)
+ return self._space
- def _feedLast(self, memo: MemoryReadMemo):
+ def _feedLast(self, memo: MemoryReadMemo, enable_cache=None):
"""Handle end of CDI XML (last packet)
End of data, so parse (or feed if self._realtime)
Args:
memo (MemoryReadMemo): successful read memo containing data.
+ enable_cache (bool): Defaults to self.enable_cache.
"""
+ if enable_cache is None:
+ enable_cache = self.enable_cache
partial_str = memo.data.decode("utf-8")
# save content
assert self._data is not None
@@ -246,6 +420,13 @@ def _feedLast(self, memo: MemoryReadMemo):
if null_i > -1:
terminate_i = min(null_i, terminate_i)
partial_str = memo.data[:terminate_i].decode("utf-8")
+ assert self.progress_count is not None
+ self.progress_count += terminate_i
+ cm = DataProcessorMemo()
+ cm.done = True # 'done' and not 'error' means got all
+ cm.progress_count = self.progress_count
+ cm.expected_size = self.expected_size
+ self.onStatusMemo(cm)
else:
# *not* realtime (but got to end, so parse all at once)
cdiString = ""
@@ -254,6 +435,9 @@ def _feedLast(self, memo: MemoryReadMemo):
if null_i > -1:
terminate_i = min(null_i, terminate_i)
cdiString = self._data[:terminate_i].decode("utf-8")
+ assert self.progress_count is not None
+ self.progress_count += terminate_i
+
# print (cdiString)
# self.parse(cdiString) # no such method
# self._parser.parse(cdiString) # urllib.error.URLError
@@ -262,29 +446,61 @@ def _feedLast(self, memo: MemoryReadMemo):
# ?xml version="1.0" encoding="utf-8"?>
xml.sax.parseString(cdiString, self)
# self._fireStatus("Done loading CDI.")
- cm = CDIMemo()
+ cm = DataProcessorMemo()
cm.done = True # 'done' and not 'error' means got all
+ cm.progress_count = self.progress_count
self.onStatusMemo(cm)
if self._realtime:
self._parser.feed(partial_str) # may call startElement/endElement
# memo = MemoryReadMemo(memo)
- path = self.cache_cdi_path(memo.nodeID)
- with open(path, 'w') as stream:
- if cdiString is None:
- cdiString = self._data.rstrip(b'\0').decode("utf-8")
- stream.write(cdiString)
- print('Saved {}'.format(repr(path)))
+ if enable_cache:
+ path = self.cacheFilePath(memo.nodeID)
+ with open(path, 'w') as stream:
+ if cdiString is None:
+ cdiString = self._data.rstrip(b'\0').decode("utf-8")
+ stream.write(cdiString)
+ print('Saved {}'.format(repr(path)))
self._data = None # Ensure isn't reused for more than one doc
- def cache_cdi_path(self, item_id: Union[NodeID, str]):
- cdi_cache_dir = os.path.join(self._myCacheDir, "cdi")
+ def cacheFilePathCustom(self, item_id: Union[NodeID, str], **kwargs):
+ if 'my_cache_dir' not in kwargs:
+ kwargs['my_cache_dir'] = self._myCacheDir
+ type(self).cacheFilePath(item_id, **kwargs)
+
+ @classmethod
+ def cacheFileName(cls, item_id: Union[NodeID, str], ext=None):
+ if ext is None:
+ ext = cls.DEFAULT_EXT
+ item_id = str(item_id) # Convert NodeID or other
+ clean_name = clean_file_name(item_id.replace(":", "."))
+ clean_name += ext
+ return clean_name
+
+ @classmethod
+ def cacheFilePath(cls, item_id: Union[NodeID, str], my_cache_dir=None,
+ subfolder: Union[str, None] = "cdi", name=None,
+ ext=None):
+ if ext is None:
+ ext = cls.DEFAULT_EXT
+ if my_cache_dir is None:
+ my_cache_dir = cls.DEFAULT_CACHE_DIR
+ if subfolder:
+ cdi_cache_dir = os.path.join(my_cache_dir, subfolder)
+ else:
+ cdi_cache_dir = my_cache_dir
if not os.path.isdir(cdi_cache_dir):
os.makedirs(cdi_cache_dir)
# TODO: add hardware name and firmware version and from SNIP to
# name file to avoid cache file from a different
# device/version.
- item_id = str(item_id) # Convert NodeID or other
- clean_name = clean_file_name(item_id.replace(":", "."))
+ if not name:
+ clean_name = cls.cacheFileName(item_id, ext=ext)
+ else:
+ clean_name = clean_file_name(name)
+ if clean_name != name:
+ logger.warning(
+ "[cacheFilePath]"
+ f" changed name {repr(name)} to {repr(clean_name)}")
# ^ replace ":" to avoid converting that one to default "_"
# ^ will raise error if path instead of name
path = os.path.join(cdi_cache_dir, clean_name)
@@ -292,7 +508,7 @@ def cache_cdi_path(self, item_id: Union[NodeID, str]):
# just to be safe, even though clean_file_name
# should prevent. If this occurs, fix clean_file_name.
raise ValueError("Cannot specify absolute path.")
- return path + ".xml"
+ return path
def startElement(self, name: str,
attrs: xml.sax.xmlreader.AttributesImpl):
@@ -303,6 +519,23 @@ def startElement(self, name: str,
self._top_tag = name.lower()
elif name.lower() == "acdi":
self.acdi = True
+ content = self._flushCharBuffer() if self._chunks else None
+ if content is not None:
+ if self._ended_memo is not None:
+ self._ended_memo.tail = content
+ else:
+ if self._tag_stack:
+ # Text in parent before this
+ # (typically "\n", possibly indentation).
+ if self._tag_stack[-1].content is None:
+ self._tag_stack[-1].content = content
+ else:
+ self._tag_stack[-1].content += content
+ else:
+ logger.warning(
+ f"Stray characters before {repr(name)}: {repr(content)}")
+ if self._ended_memo is not None:
+ self._ended_memo = None
attrib = attrs_to_dict(attrs)
origin = attrib.get('origin')
offset = attrib.get('offset')
@@ -312,7 +545,7 @@ def startElement(self, name: str,
if offset is not None:
parts.append(f"offset={offset}")
logger.debug(*parts)
- if attrs is not None and attrs :
+ if (attrs is not None) and attrs.getNames():
logger.debug(tab, " Attributes: ", attrs.getNames())
# el = ET.Element(name, attrs)
@@ -323,7 +556,10 @@ def startElement(self, name: str,
parent_cm = None
if self._tag_stack:
parent_cm = self._tag_stack[-1]
- cm = CDIMemo(tag=name, element=el, parent=parent_cm)
+ cm = CDIMemo(tag=name, element=el, parent=parent_cm, document=self)
+ # cm.space = self._tmp_space Commented since not replicated!
+ # - address and space should be set by replicatedTree or the
+ # node processing the CDI, accounting for replication.
if name == "segment":
self._tmp_space = attrib.get('space')
self._tmp_address = int(attrib.get('origin', 0))
@@ -339,9 +575,14 @@ def startElement(self, name: str,
raise AttributeError(
f"Node specifies {name} offset before segment origin")
self._tmp_address += offset
- cm.address = self._tmp_address # May be None if after /segment
+ # NOTE: ^ Sanity check only! For real address see replicatedTree.
self.onPushScope(cm)
+ if len(self._tag_stack) < 1:
+ assert self._root_memos is not None, "onStart must run first"
+ self._root_memos.append(cm)
+ if cm.tag == "cdi":
+ self._root_memo = cm
# self._callback_msg(
# "loaded: {}{}".format(tab, ET.tostring(el, encoding="unicode")))
@@ -400,7 +641,7 @@ def endElement(self, name: str):
top_cm.tag = name
cm = top_cm
else:
- cm = CDIMemo(tag=name)
+ cm = CDIMemo(tag=name, document=self)
cm.stray = True
cm.end = True
@@ -434,6 +675,7 @@ def endElement(self, name: str):
cm.parent.children.append(cm)
_ = self.checkDone(cm)
cm.content = self._flushCharBuffer()
+ self._ended_memo = cm
self.onPopScope(cm)
def _flushCharBuffer(self):
@@ -460,3 +702,217 @@ def characters(self, content: str):
raise TypeError(
"Expected str, got {}".format(type(content).__name__))
self._chunks.append(content)
+
+ def replicatedTree(self) -> Tuple[CDIMemo, ET.Element]:
+ """Build an expanded XML tree with replication and addresses.
+
+ Starting from the root CDIMemo (via :meth:`getRootMemo`), this
+ method creates a new ElementTree. Replication is expanded,
+ addresses are calculated per the OpenLCB CDI standard, and
+ ``address`` attributes are added where required.
+
+ The ``replication`` attribute is removed from all copied group
+ elements in the new tree. The original tree is left unchanged.
+
+ Returns:
+ ET.Element: Root of the new replicated tree.
+ """
+ root_memo = self.getRootMemo()
+ assert root_memo is not None and root_memo.element is not None
+
+ new_root = ET.Element("cdi") # always new: children added from memos
+ new_root.attrib.update(root_memo.element.attrib)
+ new_root.attrib['replicated'] = "true"
+ if new_root.tag != "cdi":
+ logger.warning(
+ f"expected cdi got {new_root.tag} from {root_memo.tag}")
+
+ tmp_el = root_memo.element
+ root_memo.element = None # avoid deepcopy. Temporarily erase it.
+ new_root_memo = copy.deepcopy(root_memo) # copy to avoid affecting old
+ root_memo.element = tmp_el # restore the old copy.
+ new_root_memo.document = self
+ new_root_memo.element = new_root # use this not root_memo.element copy
+ size = self._replicated_tree_recursive(new_root_memo, new_root,
+ address=0)
+ if size < 1:
+ logger.warning(
+ f"No space used by CDI after replication (size={size})")
+ return new_root_memo, new_root
+
+ def _replicated_tree_recursive(
+ self, parent: CDIMemo, parent_el: ET.Element,
+ allow_non_standard=False,
+ address: int = 0,
+ space: Union[int, None] = None,
+ ) -> int:
+ """Recursive helper for :meth:`replicatedTree`.
+
+ Copies the element, handles replication, sets addresses, and
+ recurses into children. Removes ``replication`` attribute from
+ copied group elements.
+ """
+ assert address is not None
+ assert parent.element is not None
+ parent_tag = parent.getTag() or parent.element.tag
+ parent_tag_lower = parent_tag.lower()
+ if parent_el.text:
+ parent.content = parent_el.text
+ elif parent.content: # new_root
+ parent_el.text = parent.content
+ elif parent_tag_lower in ("name", "description"):
+ logger.warning(
+ f"replicated {parent_tag_lower} has no content.")
+ if parent_el.tail:
+ parent.tail = parent_el.tail
+ elif parent.tail: # new_root
+ parent_el.tail = parent.tail
+
+ # Recurse into children (replication handled at this level)
+ new_children = []
+ # new_child_elements = []
+ for child_memo in parent.children:
+ replication_str = parent.element.attrib.get('replication')
+ count = int(replication_str) if replication_str is not None else 1
+ child_tag = child_memo.getTag()
+ assert child_tag
+ c_tag_lower = child_tag.lower()
+ child_el = child_memo.element
+ assert child_el is not None
+ if c_tag_lower == "segment":
+ space_str = child_el.attrib.get('space')
+ assert space_str, "expected space in segment"
+ space = int(space_str)
+ origin = child_el.attrib.get('origin')
+ address = int(origin) if (origin is not None) else 0
+ if c_tag_lower == "group":
+ offset = child_el.attrib.get('offset')
+ if offset:
+ address += int(offset)
+ for idx in range(count):
+ # if count > 1:
+ copy_child_el = ET.Element(child_el.tag)
+ copy_child_el.attrib.update(child_el.attrib)
+ copy_child_el.text = child_el.text
+ if child_el.text is not None:
+ copy_child_el.text = child_el.text.strip()
+ copy_child_el.tail = child_el.tail
+ if child_el.tail is not None:
+ copy_child_el.tail = child_el.tail.strip()
+ copy_child_memo = copy.deepcopy(child_memo)
+ copy_child_memo.parent = parent
+ copy_child_memo.document = self
+ copy_child_memo.element = copy_child_el
+ # else:
+ # copy_child_el = child_el
+ # copy_child_memo = child_memo
+ # NOTE: ^ Why commented: We don't want to modify
+ # self.etree children (if we modify replicatedTree
+ # result such as self.replicated_root)!
+ # - Don't even chance it by keeping the memo
+ # (otherwise child_memo.element would be from tree).
+ # - Also, we always add child to parent_el below.
+
+ new_children.append(copy_child_memo)
+ # for child_el in new_parent:
+ # copy_parent_el.append(child_el)
+
+ # Remove replication from the replicated copy
+ if "replication" in copy_child_el.attrib:
+ del copy_child_el.attrib["replication"]
+
+ if c_tag_lower == "group" or c_tag_lower in CLASSNAME_TYPES:
+ copy_child_el.set('address', str(address))
+ copy_child_el.set('space', str(space))
+ if c_tag_lower in CLASSNAME_TYPES:
+ copy_child_memo.address = address
+ copy_child_memo.space = space
+
+ if replication_str is not None:
+ if c_tag_lower != "group":
+ el_error = \
+ f"unexpected replication for {c_tag_lower} tag"
+ if allow_non_standard:
+ logger.warning(el_error)
+ else:
+ raise SyntaxError(el_error)
+ copy_child_el.set('replication_index', str(idx))
+ # ^ optimized attrib[key] = value
+
+ parent_el.append(copy_child_el)
+ # parent: Use new_children below (can't change while iterating)
+
+ # Determine size for address advancement
+ if c_tag_lower == "eventid":
+ size = 8
+ elif "size" in copy_child_el.attrib:
+ size = int(copy_child_el.attrib["size"])
+ else:
+ size = 0
+
+ # Advance address *before* leaf variables
+ if size:
+ if c_tag_lower in CLASSNAME_TYPES:
+ assert size, f"expected size for {c_tag_lower}"
+ address += size
+ else:
+ el_error = (
+ f"size is not expected for {c_tag_lower}"
+ f" size={size}")
+ if allow_non_standard:
+ logger.warning(el_error)
+ address += size
+ else:
+ assert not size, el_error
+
+ address = self._replicated_tree_recursive(
+ copy_child_memo, copy_child_el, address=address,
+ space=space)
+ if c_tag_lower == "segment":
+ space = None # undefined after section
+
+ parent.children = new_children # Same references if no replication
+ return address
+
+ def extractCDIVarMemos(self, replicated_root=None, root_memo=None) -> List[CDIMemo]: # noqa: E501
+ # type: (ET.Element|None, CDIMemo|None) -> List[CDIMemo]
+ """Build a flat list of CDIMemo objects for all variables.
+
+ Uses the replicated tree (replication expanded, replication
+ attribute removed, addresses set). Returns original-style
+ memos (with .content) but with .element pointing into the
+ replicated tree so that modifications (setData etc.) affect
+ the saved XML.
+ """
+ # TODO: Implement ACDI vars if present (See OpenLCB
+ # "Configuration Description Information" Standard)
+ if not hasattr(self, "etree") or self.etree is None:
+ logger.error("processor has no etree")
+ return []
+ if replicated_root is not None:
+ if root_memo is not None: # reserved
+ assert isinstance(root_memo, CDIMemo)
+ assert isinstance(replicated_root, ET.Element)
+ root_memo = root_memo # reserved
+ root_el = replicated_root
+ else:
+ root_memo, root_el = self.replicatedTree()
+ self.replicated_root = root_el
+ self.replicated_root_memo = root_memo
+
+ assert isinstance(self.replicated_root_memo, CDIMemo)
+
+ cdivar_memos: List[CDIMemo] = []
+
+ def traverse(memo: CDIMemo) -> None:
+ tag = memo.getTag()
+ tag_lower = tag.lower() if tag else ""
+ if tag_lower in CLASSNAME_TYPES:
+ # Use the existing replicated memo (has correct .content)
+ cdivar_memos.append(memo)
+ for child in memo.children:
+ traverse(child)
+
+ traverse(self.replicated_root_memo)
+ assert root_el is self.replicated_root # concurrent modification check
+ return cdivar_memos
diff --git a/python-openlcb.code-workspace b/python-openlcb.code-workspace
index 160c158..bf30c55 100644
--- a/python-openlcb.code-workspace
+++ b/python-openlcb.code-workspace
@@ -41,6 +41,7 @@
"cdiform",
"cdimemo",
"cdivar",
+ "cdivars",
"columnspan",
"controlframe",
"datagram",
@@ -75,6 +76,7 @@
"offvalue",
"onvalue",
"openlcb",
+ "openlcbaction",
"openlcbnetwork",
"padx",
"pady",
@@ -107,6 +109,7 @@
"usbmodem",
"WASI",
"winnative",
+ "wraplength",
"xmldataprocessor",
"xscrollcommand",
"zeroconf"
diff --git a/tests/test_cdivar.py b/tests/test_cdivar.py
index 22af185..51682dc 100644
--- a/tests/test_cdivar.py
+++ b/tests/test_cdivar.py
@@ -18,12 +18,15 @@ def test_initialization_valid(self):
self.assertEqual(cdivar_float.max, 100.0)
self.assertEqual(cdivar_float.size, 4)
+ maxSize = 100
cdivar_string = CDIVar(className='string',
- _default=bytearray(b'Hello'))
+ _default=bytearray(b'Hello'),
+ _size=maxSize)
self.assertEqual(cdivar_string.className, 'string')
self.assertEqual(cdivar_string.default, bytearray(b'Hello'))
assert cdivar_string.default is not None
- self.assertEqual(cdivar_string.size, len(cdivar_string.default))
+ # self.assertEqual(cdivar_string.size, len(cdivar_string.default))
+ self.assertEqual(cdivar_string.size, maxSize)
def test_initialization_invalid_classname(self):
with self.assertRaises(AssertionError):
@@ -60,7 +63,7 @@ def test_set_get_float(self):
self.assertAlmostEqual(got, 3.14, places=6)
def test_set_get_string(self):
- cdivar_string = CDIVar(className='string')
+ cdivar_string = CDIVar(className='string', _size=100)
cdivar_string.setString("Hello")
self.assertEqual(cdivar_string.getString(), "Hello")
@@ -75,8 +78,8 @@ def test_invalid_set_float(self):
cdivar_float.setFloat("not a float") # type:ignore (assertRaises)
def test_invalid_set_string(self):
- cdivar_string = CDIVar(className='string')
- with self.assertRaises(AssertionError):
+ cdivar_string = CDIVar(className='string', _size=100)
+ with self.assertRaises(AttributeError): # number has no attribute 'encode'
cdivar_string.setString(12345) # type:ignore (assertRaises)
diff --git a/tests/test_cdivar_edge_cases.py b/tests/test_cdivar_edge_cases.py
new file mode 100644
index 0000000..7820c43
--- /dev/null
+++ b/tests/test_cdivar_edge_cases.py
@@ -0,0 +1,151 @@
+from typing import List, Tuple
+import unittest
+
+from openlcb.cdivar import CDIVar
+
+
+class TestCDIVarNumericConversions(unittest.TestCase):
+
+ def assertBytesEqual(self, expected_hex: List[int], actual: bytes,
+ msg: str = ""):
+ expected = bytes(expected_hex)
+ self.assertEqual(
+ expected,
+ actual,
+ (f"{msg}\n Expected: {expected.hex(' ').upper()}"
+ f"\n Got: {actual.hex(' ').upper()}")
+ )
+
+ # -------------------------------------------------------------------------
+ # Basic signed int conversions — edge cases (4 bytes)
+ # -------------------------------------------------------------------------
+ def test_setInt_getInt_4byte_edge_cases(self):
+ cases: List[Tuple[int, List[int]]] = [
+ (-1, [0xFF, 0xFF, 0xFF, 0xFF]),
+ (-2147483648, [0x80, 0x00, 0x00, 0x00]), # INT32_MIN
+ (2147483647, [0x7F, 0xFF, 0xFF, 0xFF]),
+ (0, [0x00, 0x00, 0x00, 0x00]),
+ (300, [0x00, 0x00, 0x01, 0x2C]),
+ (0x12345678, [0x12, 0x34, 0x56, 0x78]),
+ ]
+
+ for value, expected_bytes in cases:
+ with self.subTest(f"int {value} → bytes"):
+ var = CDIVar("int", _size=4, _min=-1) # signed
+ var.setInt(value)
+ assert var.data is not None
+ self.assertBytesEqual(expected_bytes, var.data)
+
+ restored = var.getInt()
+ self.assertEqual(value, restored)
+
+ # -------------------------------------------------------------------------
+ # Smaller sizes — sign extension behavior
+ # -------------------------------------------------------------------------
+ def test_small_int_sizes_sign_extension(self):
+ cases = [
+ # value, size, signed, bytes, expected getInt
+ (-100, 2, True, [0xFF, 0x9C], -100),
+ (0xABCD, 2, False, [0xAB, 0xCD], 0xABCD),
+ (-128, 4, True, [0xFF, 0xFF, 0xFF, 0x80], -128),
+ (0x5A, 1, False, [0x5A], 0x5A),
+ ]
+
+ for val, size, signed, exp_bytes, exp_restored in cases:
+ with self.subTest(f"{val} @ {size} bytes signed={signed}"):
+ var = CDIVar("int", _size=size, _min=-1 if signed else 0)
+ var.setInt(val)
+ assert var.data is not None
+ self.assertBytesEqual(exp_bytes, var.data)
+
+ restored = var.getInt()
+ self.assertEqual(exp_restored, restored)
+
+ # -------------------------------------------------------------------------
+ # Strict IEEE 754 binary16 (half-precision) bit-exact tests
+ # -------------------------------------------------------------------------
+ def test_float16_strict_bit_exact(self):
+ cases = [ # noqa: E501
+ # value expected [high, low] description
+ (0.0, [0x00, 0x00], "+0.0"),
+ (5.9604644775390625e-8, [0x00, 0x01], "smallest positive subnormal"), # noqa: E501
+ (-5.9604644775390625e-8, [0x80, 0x01], "smallest negative subnormal"), # noqa: E501
+ (6.103515625e-5, [0x04, 0x00], "smallest positive normal"), # noqa: E501
+ (-6.103515625e-5, [0x84, 0x00], "smallest negative normal"), # noqa: E501
+ (1.0, [0x3C, 0x00], "1.0 exact"),
+ (-1.0, [0xBC, 0x00], "-1.0"),
+ (0.5, [0x38, 0x00], "0.5"),
+ (-0.5, [0xB8, 0x00], "-0.5"),
+ (65504.0, [0x7B, 0xFF], "max finite"),
+ (-65504.0, [0xFB, 0xFF], "max negative finite"), # noqa: E501
+ (float("inf"), [0x7C, 0x00], "+Inf"),
+ (float("-inf"), [0xFC, 0x00], "-Inf"),
+ # (float("nan"), [0x7E, 0x00], "canonical quiet NaN"), # noqa: E501
+ # (65536.0, [0x7C, 0x00], "overflow → +Inf"), # noqa: E501
+ # (1.00048828125, [0x3C, 0x01], "ties-to-even example"), # noqa: E501
+ (float("nan"), [0x7E, 0x00], "canonical quiet NaN"), # noqa: E501
+ # 65536.0 removed — Python struct raises OverflowError (expected)
+ # (1.00048828125, [0x3C, 0x00], "ties-to-even rounds to even (down in this case)"), # noqa: E501
+ # ^ becomes 1.0 due to float16 precision, so commented
+ (1.0009765625, [0x3C, 0x01], "1 + 2⁻¹⁰ = exact in float16"), # noqa: E501
+ # (1.00048828125 + 1e-12, [0x3C, 0x01], "slightly above midpoint → rounds up"), # noqa: E501
+ # ^ AssertionError: 1.000488281251 != 1.0009765625 : Round-trip mismatch: 1.000488281251 → 1.0009765625 # noqa: E501
+ # due to float16 precision
+ ]
+
+ for val, expected, message in cases:
+ with self.subTest(f"float16 {val}"):
+ var = CDIVar("float", _size=2)
+ var.setFloat(val)
+ assert var.data is not None
+ self.assertBytesEqual(expected, var.data,
+ f"setFloat 16 ({val}) {message} failed") # noqa: E501
+
+ # round-trip check
+ restored = var.getFloat()
+ assert restored is not None
+ if val != val: # NaN
+ self.assertTrue(restored != restored)
+ elif abs(val) == float("inf"):
+ self.assertTrue(
+ abs(restored) == float("inf") and (restored > 0) == (val > 0), # noqa: E501
+ f"setFloat 16 {message} failed"
+ )
+ else:
+ # For representable values → should be bit-exact round-trip
+ self.assertEqual(
+ val, restored,
+ f"Round-trip mismatch: {val} → {restored}"
+ )
+
+ # -------------------------------------------------------------------------
+ # Basic null-terminated string behavior (modified methods)
+ # -------------------------------------------------------------------------
+ def test_string_null_terminated(self):
+ cases = [
+ ("hello", b"hello\x00"),
+ ("", b"\x00"),
+ ("café π", "café π".encode("utf-8") + b"\x00"),
+ ]
+
+ for s, expected_bytes in cases:
+ with self.subTest(f"setString({s!r})"):
+ var = CDIVar("string", _size=100)
+ var.setString(s)
+ self.assertEqual(expected_bytes, var.data)
+
+ restored = var.getString()
+ self.assertEqual(s, restored)
+
+ # Extra data after null is ignored
+ var = CDIVar("string")
+ var.data = b"test\x00junk"
+ self.assertEqual("test", var.getString())
+
+ # No null → whole content
+ var.data = b"no-null-here"
+ self.assertEqual("no-null-here", var.getString())
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
\ No newline at end of file
diff --git a/tests/test_convert.py b/tests/test_convert.py
new file mode 100644
index 0000000..4271edd
--- /dev/null
+++ b/tests/test_convert.py
@@ -0,0 +1,104 @@
+
+import struct
+import unittest
+
+from openlcb.convert import Convert
+
+
+class TestConvertClass(unittest.TestCase):
+
+ def testReturnCyrillicStrings(self):
+ # See also testReturnCyrillicStrings in test_snip
+ # If you have characters specific to UTF-8 (either in code or comment)
+ # add the following as the 1st or 2nd line of the py file:
+ # -*- coding: utf-8 -*-
+ data = bytearray([0xd0, 0x94, 0xd0, 0xbc, 0xd0, 0xb8, 0xd1, 0x82, 0xd1, 0x80, 0xd0, 0xb8, 0xd0, 0xb9]) # Cyrillic spelling of the name Dmitry (7 characters becomes 14 bytes) # noqa: E501
+ self.assertEqual(Convert.arrayToString(data, len(data)), "Дмитрий") # Cyrillic spelling of the name Dmitry. This string should appear as 7 Cyrillic characters like Cyrillic-demo-Dmitry.png in doc (14 bytes in a hex editor), otherwise your editor does not support utf-8 and editing this file with it could break it. # noqa:E501
+ # TODO: Russian version is Дми́трий according to . See Cyrillic-demo-Dmitry-Russian.png in doc. # noqa:E501
+
+ def testArrayToString(self):
+ sut = Convert.arrayToString(bytearray([0x41, 0x42, 0x43, 0x44]), 4) # noqa:E501
+ self.assertEqual(sut, "ABCD")
+
+ sut = Convert.arrayToString(bytearray([0x41, 0x42, 0, 0x44]), 4)
+ self.assertEqual(sut, "AB")
+
+ sut = Convert.arrayToString(bytearray([0x41, 0x42, 0x43, 0x44]), 2) # noqa:E501
+ self.assertEqual(sut, "AB")
+
+ sut = Convert.arrayToString(bytearray([0x41, 0x42, 0x43, 0]), 4)
+ self.assertEqual(sut, "ABC")
+
+ sut = Convert.arrayToString(bytearray([0x41, 0x42, 0x31, 0x32]), 8) # noqa:E501
+ self.assertEqual(sut, "AB12")
+
+ def testStringToArray(self):
+ aut = Convert.stringToArray("ABCD", 4)
+ self.assertEqual(aut, bytearray([0x41, 0x42, 0x43, 0x44]))
+
+ aut = Convert.stringToArray("ABCD", 2)
+ self.assertEqual(aut, bytearray([0x41, 0x42]))
+
+ aut = Convert.stringToArray("ABCD", 6)
+ self.assertEqual(aut, bytearray([0x41, 0x42, 0x43, 0x44, 0x00, 0x00]))
+
+ def testIntToArray(self):
+ test_metas = [
+ {
+ 'value': 65536, # not a short (1 over max)
+ 'length': 8,
+ # good_bytes: b'\x00\x00\x00\x00\x00\x01\x00\x00'
+ },
+ {
+ 'value': 65536,
+ 'length': 4,
+ # good_bytes: b'\x00\x01\x00\x00',
+ },
+ {
+ 'value': 281470681743360, # 65535 << 32
+ 'length': 8,
+ # 'good_bytes': b'\x00\x00\xff\xff\x00\x00\x00\x00',
+ }
+ ]
+ for test_meta in test_metas:
+ value = test_meta['value']
+ length = test_meta['length']
+ good_bytes = struct.pack(">{}s".format(length),
+ value.to_bytes(length, 'big'))
+ self.assertEqual(Convert.intToArray(value, length),
+ good_bytes)
+
+ def testIntToArrayFail(self):
+ test_metas = [
+ {
+ 'value': 65536, # not a short (1 over max)
+ 'length': 2,
+ # good_bytes: b'\x00\x00\x00\x00\x00\x01\x00\x00'
+ },
+ {
+ 'value': 281470681743360, # 65535 << 32
+ 'length': 4,
+ # 'good_bytes': b'\x00\x00\xff\xff\x00\x00\x00\x00',
+ }
+ ]
+ for test_meta in test_metas:
+ value = test_meta['value']
+ length = test_meta['length']
+ with self.assertRaises(ValueError):
+ Convert.intToArray(value, length)
+
+ def testSerializeSpace(self):
+ byte6 = False
+ space = 0x00
+
+ (byte6, space) = Convert.serializeSpace(0xF8)
+ self.assertEqual(space, 0xF8)
+ self.assertTrue(byte6)
+
+ (byte6, space) = Convert.serializeSpace(0xFF)
+ self.assertEqual(space, 0x03)
+ self.assertFalse(byte6)
+
+ (byte6, space) = Convert.serializeSpace(0xFD)
+ self.assertEqual(space, 0x01)
+ self.assertFalse(byte6)
diff --git a/tests/test_memoryservice.py b/tests/test_memoryservice.py
index 0705214..8033d20 100644
--- a/tests/test_memoryservice.py
+++ b/tests/test_memoryservice.py
@@ -6,6 +6,7 @@
from logging import getLogger
+from openlcb.convert import Convert
from openlcb.physicallayer import PhysicalLayer
if __name__ == "__main__":
logger = getLogger(__file__)
@@ -80,15 +81,6 @@ def setUp(self):
)
self.mService = MemoryService(self.dService)
- def testReturnCyrillicStrings(self):
- # See also testReturnCyrillicStrings in test_snip
- # If you have characters specific to UTF-8 (either in code or comment)
- # add the following as the 1st or 2nd line of the py file:
- # -*- coding: utf-8 -*-
- data = bytearray([0xd0, 0x94, 0xd0, 0xbc, 0xd0, 0xb8, 0xd1, 0x82, 0xd1, 0x80, 0xd0, 0xb8, 0xd0, 0xb9]) # Cyrillic spelling of the name Dmitry (7 characters becomes 14 bytes) # noqa: E501
- self.assertEqual(self.mService.arrayToString(data, len(data)), "Дмитрий") # Cyrillic spelling of the name Dmitry. This string should appear as 7 Cyrillic characters like Cyrillic-demo-Dmitry.png in doc (14 bytes in a hex editor), otherwise your editor does not support utf-8 and editing this file with it could break it. # noqa:E501
- # TODO: Russian version is Дми́трий according to . See Cyrillic-demo-Dmitry-Russian.png in doc. # noqa:E501
-
def testSingleRead(self):
memMemo = MemoryReadMemo(NodeID(123), 64, 0xFD, 0,
self.callbackR, self.callbackR)
@@ -191,93 +183,6 @@ def testMultipleRead(self):
self.assertEqual(len(LinkMockLayer.sentMessages), 5) # read reply datagram reply sent and next datagram sent # noqa: E501
self.assertEqual(len(self.returnedMemoryReadMemo), 2) # memory read returned # noqa: E501
- def testArrayToString(self):
- sut = MemoryService.arrayToString(bytearray([0x41, 0x42, 0x43, 0x44]), 4) # noqa:E501
- self.assertEqual(sut, "ABCD")
-
- sut = MemoryService.arrayToString(bytearray([0x41, 0x42, 0, 0x44]), 4)
- self.assertEqual(sut, "AB")
-
- sut = MemoryService.arrayToString(bytearray([0x41, 0x42, 0x43, 0x44]), 2) # noqa:E501
- self.assertEqual(sut, "AB")
-
- sut = MemoryService.arrayToString(bytearray([0x41, 0x42, 0x43, 0]), 4)
- self.assertEqual(sut, "ABC")
-
- sut = MemoryService.arrayToString(bytearray([0x41, 0x42, 0x31, 0x32]), 8) # noqa:E501
- self.assertEqual(sut, "AB12")
-
- def testStringToArray(self):
- aut = MemoryService.stringToArray("ABCD", 4)
- self.assertEqual(aut, bytearray([0x41, 0x42, 0x43, 0x44]))
-
- aut = MemoryService.stringToArray("ABCD", 2)
- self.assertEqual(aut, bytearray([0x41, 0x42]))
-
- aut = MemoryService.stringToArray("ABCD", 6)
- self.assertEqual(aut, bytearray([0x41, 0x42, 0x43, 0x44, 0x00, 0x00]))
-
- def testIntToArray(self):
- test_metas = [
- {
- 'value': 65536, # not a short (1 over max)
- 'length': 8,
- # good_bytes: b'\x00\x00\x00\x00\x00\x01\x00\x00'
- },
- {
- 'value': 65536,
- 'length': 4,
- # good_bytes: b'\x00\x01\x00\x00',
- },
- {
- 'value': 281470681743360, # 65535 << 32
- 'length': 8,
- # 'good_bytes': b'\x00\x00\xff\xff\x00\x00\x00\x00',
- }
- ]
- for test_meta in test_metas:
- value = test_meta['value']
- length = test_meta['length']
- good_bytes = struct.pack(">{}s".format(length),
- value.to_bytes(length, 'big'))
- self.assertEqual(MemoryService.intToArray(value, length),
- good_bytes)
-
- def testIntToArrayFail(self):
- test_metas = [
- {
- 'value': 65536, # not a short (1 over max)
- 'length': 2,
- # good_bytes: b'\x00\x00\x00\x00\x00\x01\x00\x00'
- },
- {
- 'value': 281470681743360, # 65535 << 32
- 'length': 4,
- # 'good_bytes': b'\x00\x00\xff\xff\x00\x00\x00\x00',
- }
- ]
- for test_meta in test_metas:
- value = test_meta['value']
- length = test_meta['length']
- with self.assertRaises(ValueError):
- MemoryService.intToArray(value, length)
-
- def testSpaceDecode(self):
- byte6 = False
- space = 0x00
-
- (byte6, space) = self.mService.spaceDecode(0xF8)
- self.assertEqual(space, 0xF8)
- self.assertTrue(byte6)
-
- (byte6, space) = self.mService.spaceDecode(0xFF)
- self.assertEqual(space, 0x03)
- self.assertFalse(byte6)
-
- (byte6, space) = self.mService.spaceDecode(0xFD)
- self.assertEqual(space, 0x01)
- self.assertFalse(byte6)
-
if __name__ == '__main__':
unittest.main()