Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
95d942a
Move static methods to new Convert class to avoid circular imports an…
Poikilos Mar 18, 2026
10fe25a
Handle CDIVar size as max for "string" and add tests for edge cases.
Poikilos Mar 18, 2026
9b29e37
Fix: Handle "." at end of nodeid range prefix.
Poikilos Apr 22, 2026
adccc26
Fix: Handle memo using new OO-defined onStatusMemo (formerly _onEleme…
Poikilos Apr 22, 2026
4725497
Enforce onStartDownload via OO.
Poikilos Apr 22, 2026
ae1d1cf
Add high-level (size-aware if set manually) DataProcessor progress. A…
Poikilos Apr 22, 2026
7192c26
Fix: Provide size to CDIVar constructor. Add CDIVar serialization and…
Poikilos Apr 22, 2026
921f518
Save the memo tree in XMLDataProcessor (formerly had to be collected …
Poikilos Apr 23, 2026
db12c3d
Add load feature (Load cached CDI XML file). Make caching configurabl…
Poikilos Apr 23, 2026
03ebb93
Fix: Set size 8 for eventid. Enforce valid 'action' sizes. Add expand…
Poikilos Apr 23, 2026
1682c76
Make a DataProcessorMemo superclass for parsing messages that are not…
Poikilos Apr 25, 2026
f54ee8c
Improve emit_cast.
Poikilos Apr 25, 2026
3ac3776
Fix replication recursion. Fix: Only set CDIMemo address during repli…
Poikilos Apr 25, 2026
a7d0cd6
Fix: Correctly separate tail of XML--prevents formatting from modifyi…
Poikilos Apr 27, 2026
d705a20
Fix: Correctly handle origin and offset. Fix: Correctly preserve XML …
Poikilos Apr 27, 2026
de593f9
only act on datagram replies matched to our requests
bobjacobsen Apr 30, 2026
45783bd
Improve hr_repr. Separate cacheFileName code.
Poikilos Apr 30, 2026
34a17cc
Make default extension specific to DataProcessor subclass.
Poikilos Apr 30, 2026
2feca63
Make missing name issue more clear.
Poikilos May 1, 2026
3c737fb
Merge pull request #10 from bobjacobsen/only-our-datagram-replies
Poikilos May 1, 2026
7c9a191
Add import used for type hint.
Poikilos May 1, 2026
e55fa78
Check if rejectedReply should be discarded as well (follow-up to simi…
Poikilos May 1, 2026
87ffcd2
Implement stream modes for Memory Configuration.
Poikilos May 1, 2026
1772370
Rename spaceDecode to serializeSpace for clarity, and add opposite me…
Poikilos May 1, 2026
b5a54dc
Allow length request reply to have arbitrary size.
Poikilos May 1, 2026
0074507
Fix error code deserialization (order of operation issue; result now …
Poikilos May 1, 2026
1f9afcd
Add debug output for unmatched datagrams.
Poikilos May 1, 2026
c4f69bd
Track whether CDI was loaded from cache (file). Add assert to clarify…
Poikilos May 5, 2026
c2df185
Fix: Respect cache setting (Only save CDI file in that case).
Poikilos May 5, 2026
8b687db
Fix non-subscripable type (Change to PEP8 commented type hint).
Poikilos May 11, 2026
bcef09b
Add clear option such as for reconnect if client code doesn't want to…
Poikilos May 11, 2026
1d6b3ff
(NOOP) Fix some type hints and a docstring.
Poikilos May 19, 2026
d8b0680
Begin implementing local node memory (FIXME: responding to memory con…
Poikilos May 19, 2026
82326de
(NOOP) Rename expanded* to replicated* for consistency.
Poikilos May 19, 2026
1a1e10c
Remove lint.
Poikilos May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/example_cdi_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from examples_settings import Settings # do 1st to fix path if no pip install
from openlcb import precise_sleep
from openlcb.convert import Convert
from openlcb.xmldataprocessor import attrs_to_dict
from openlcb.tcplink.tcpsocket import TcpSocket
settings = Settings()
Expand Down Expand Up @@ -137,7 +138,7 @@ def memoryReadSuccess(memo):
resultingCDI += memo.data
logger.debug(
f"[{memo.address}] successful read"
f" {MemoryService.arrayToString(memo.data, len(memo.data))}"
f" {Convert.arrayToString(memo.data, len(memo.data))}"
"; next = address + 64")
# update the address
memo.address = memo.address+64
Expand Down Expand Up @@ -189,10 +190,8 @@ class MyHandler(xml.sax.handler.ContentHandler):
_chunks (list[str]): Collects chunks of data.
This is implementation-specific, and not
required if streaming (parser.feed).
_tmp_address (int|None): Where we are in the memory space (starting
at origin, and calculated using offset and/or size of start
tags).
_tmp_space (int|None): What space we are currently on.
_tmp_address (int|None): For sanity check, not actual address.
See replicatedTree docstring.
"""

def __init__(self):
Expand Down
6 changes: 5 additions & 1 deletion examples/example_node_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def printMessage(message):
print("RM: {} from {}".format(message, message.source))


canLink = CanLink(physicalLayer, NodeID(settings['localNodeID']))
localNodeID = NodeID(settings['localNodeID'])
print()
print(f"[example_node_memory_implementation] localNodeID: {localNodeID}")

canLink = CanLink(physicalLayer, localNodeID)
canLink.registerMessageReceivedListener(printMessage)

datagramService = DatagramService(canLink)
Expand Down
248 changes: 248 additions & 0 deletions examples/example_node_memory_implementation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
'''
Demo of creating a virtual node to represent the application
(other local nodes are possible, but at least one is necessary
for the application to announce itself and provide SNIP info),
in this case with memory to allow another node to change settings
(could also be used to for a second virtual node such as to
represent/emulate a non-LCC train, but a separate
virtual node from the Configuration Tool is recommended in
that case).

based on example_node_implementation from python-openlcb examples.

Usage:
python3 example_node_memory_implementation.py [host|host:port]

Options:
host|host:port (optional) Set the address (or using a colon,
the address and port). Defaults to a hard-coded test
address and port.
'''
import os
import socket
import struct

# region same code as other examples
from examples_settings import Settings
from openlcb.localnode import LocalNode # do 1st to fix path if no pip install
settings = Settings()

if __name__ == "__main__":
settings.load_cli_args(docstring=__doc__)
# endregion same code as other examples

from openlcb import emit_cast, get_config_dir, precise_sleep # noqa: E402
from openlcb.tcplink.tcpsocket import TcpSocket # noqa: E402

from openlcb.canbus.canphysicallayergridconnect import ( # noqa: E402
CanPhysicalLayerGridConnect,
)
from openlcb.canbus.canlink import CanLink # noqa: E402
from openlcb.nodeid import NodeID # noqa: E402
from openlcb.datagramservice import DatagramService # noqa: E402
from openlcb.memoryservice import MemoryService # noqa: E402
from openlcb.message import Message # noqa: E402
from openlcb.mti import MTI # noqa: E402

from openlcb.localnodeprocessor import LocalNodeProcessor # noqa: E402
from openlcb.pip import PIP # noqa: E402
from openlcb.snip import SNIP # noqa: E402
from openlcb.node import Node # noqa: E402

# specify connection information
# region moved to settings
# host = "192.168.16.212"
# port = 12021
# localNodeID = "05.01.01.01.03.01"
# farNodeID = "09.00.99.03.00.35"
# endregion moved to settings

sock = TcpSocket()
# s.settimeout(30)
try:
sock.connect(settings['host'], settings['port'])
except socket.gaierror:
print("Failure accessing {}:{}"
.format(settings.get('host'), settings.get('port')))
raise

print("RR, SR are raw socket interface receive and send;"
" RL, SL are link interface; RM, SM are message interface")


# def sendToSocket(frame: CanFrame):
# string = frame.encodeAsString()
# print(" SR: {}".format(string.strip()))
# sock.sendString(string)
# physicalLayer.onFrameSent(frame)


def printFrame(frame):
print(" RL: {}".format(frame))


physicalLayer = CanPhysicalLayerGridConnect()
physicalLayer.registerFrameReceivedListener(printFrame)


def printMessage(message):
print("RM: {} from {}".format(message, message.source))


localNodeID = NodeID(settings['localNodeID'])
print()
print(f"[example_node_memory_implementation] localNodeID: {localNodeID}")
canLink = CanLink(physicalLayer, localNodeID)
canLink.registerMessageReceivedListener(printMessage)

datagramService = DatagramService(canLink)
canLink.registerMessageReceivedListener(datagramService.process)

spaces = { # big endian (most significant byte sent first) as per openlcb
# 0: bytearray([
# 0x01, 0x00, # 0x1000 = 4096 (unsigned int 16)
# ])
0: bytearray(struct.pack(">H", 12021)),
}
# bytearray allows in-place append (from pack bytes does not)
# H: short (capitalized means unsigned)
# >: big endian (required for openlcb)
# e: float16 (IEEE 754 binary16, 2-bytes)
# For other symbols see Python documentation or SUBTYPE_FORMATS in cdivar.py.

spaces[0] += struct.pack(">e", 0.5) # save at address 3 (size 2)
# NOTE: 0.5 can be stored precisely, as b'\x008'
# but not all numbers can be represented by IEEE float.
# For example, 2.4 is stored as b'\xcd@' which is ~2.400390625

# Additional pack examples:
neg2_float_ba = bytearray(b'\xc0\x00')
neg2_float_b = struct.pack(">e", -2)
assert bytes(neg2_float_ba) == bytes(neg2_float_b), \
f"expected b'\xc0\x00', b'\xc0\x00', got {neg2_float_ba}, {neg2_float_b}"

cdi = """<?xml version="1.0" encoding="utf-8"?>
<cdi
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://openlcb.org/schema/cdi/1/1/cdi.xsd">
<identification>
<manufacturer>python-openlcb example authors</manufacturer>
<model>example_node_memory_implementation</model>
<hardwareVersion>1.0</hardwareVersion>
<softwareVersion>1.0</softwareVersion>
</identification>
<acdi/>
<segment space='0' origin='0'>
<int size="2">
<name>Port</name>
<description>Network port of remote hub (2-byte unsigned short)</description>
<default>12021</default>
</int>
<float size="2">
<name>Timeout</name>
<description>Network timeout (2-byte binary16 value).</description>
<default>0.5</default>
</float>
</segment>
</cdi>
""" # noqa: E501


def handleDatagram(memo):
"""create a call-back to print datagram contents when received

Args:
memo (DatagramReadMemo): The datagram received

Returns:
bool: Always False (True would mean we sent a reply to the datagram,
but let the MemoryService do that).
"""
print(f"Datagram receive call back: {emit_cast(memo)}")
return False


datagramService.registerDatagramReceivedListener(handleDatagram)

memoryService = MemoryService(datagramService)


# callbacks to get results of memory read

def memoryReadSuccess(memo):
print("successful memory read: {}".format(memo.data))


def memoryReadFail(memo):
print("memory read failed: {}".format(memo.data))


# create a node and connect it update
# This is a very minimal node, which just takes part in the low-level common
# protocols
localNode = LocalNode(
NodeID(settings['localNodeID']),
SNIP("python-openlcb example authors",
"example_node_memory_implementation",
"1.0", "1.0", "Custom Name Here", "Custom Description Here"),
set([
PIP.SIMPLE_NODE_IDENTIFICATION_PROTOCOL,
PIP.DATAGRAM_PROTOCOL,
PIP.CONFIGURATION_DESCRIPTION_INFORMATION,
PIP.ADCDI_PROTOCOL,
PIP.MEMORY_CONFIGURATION_PROTOCOL,
]),
canLink
)
my_conf_dir = os.path.join(get_config_dir("python-openlcb"))
backup_name = "example_node_memory_implementation.cdi.xml"
backup_path = os.path.join(my_conf_dir, backup_name)

localNode.loadCDIString(cdi, backup_path)

# localNodeProcessor = LocalNodeProcessor(canLink, localNode)
# canLink.registerMessageReceivedListener(localNodeProcessor.process)
localNodeProcessor = localNode.localNodeProcessor


def displayOtherNodeIds(message) :
"""Listener to identify connected nodes

Args:
message (Message): A response from the network
"""
print("[displayOtherNodeIds] type(message): {}"
"".format(type(message).__name__))
if message.mti == MTI.Verified_NodeID :
print("Detected farNodeID is {}".format(message.source))


canLink.registerMessageReceivedListener(displayOtherNodeIds)


#######################

# have the socket layer report up to bring the link layer up and get an alias

print(" SL : link up...")
physicalLayer.physicalLayerUp()
print(" SL : link up...waiting...")
while canLink.pollState() != CanLink.State.Permitted:
physicalLayer.receiveAll(sock, verbose=settings['trace'])
physicalLayer.sendAll(sock, verbose=True)
precise_sleep(.02)
print(" SL : link up")
# request that nodes identify themselves so that we can print their node IDs
message = Message(MTI.Verify_NodeID_Number_Global,
NodeID(settings['localNodeID']), None)
canLink.sendMessage(message)

# process resulting activity
while True:
count = 0
count += physicalLayer.sendAll(sock, verbose=True)
count += physicalLayer.receiveAll(sock, verbose=settings['trace'])
if count < 1:
precise_sleep(.01)
# else skip sleep to avoid latency (port already delayed)

physicalLayer.physicalLayerDown()
1 change: 0 additions & 1 deletion examples/examples_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,6 @@ def downloadCDI(self, farNodeID: str):
self.setStatus("Downloading CDI...")
assert self.cdi_form is not None
assert self.network is not None
self.cdi_form.onStartDownload()
try:
self.network.download(farNodeID, MemorySpace.CDI,
self.cdi_form)
Expand Down
9 changes: 5 additions & 4 deletions examples/tkexamples/cdiform.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,14 @@ def getStatus(self):

def onStartDownload(self):
"""Initialize variables used by element handler(s)."""
self.onStart()
self._resetTree()
XMLDataProcessor.onStartDownload(self)
# TODO: clear tree?

def onStatusMemo(self, cm: CDIMemo) -> bool:
"""Handler for incoming CDI tag
Use this for callback in downloadCDI, which sets parser
(_dataProcessor)'s _onElement.
Use this for callback in downloadCDI
(onStatusMemo replaces _dataProcessor's _onElement
formerly set by downloadCDI).

Args:
cm (CDIMemo): Document parsing state info
Expand Down
44 changes: 43 additions & 1 deletion openlcb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from enum import Enum
import os
import platform
import re
import time

Expand All @@ -23,11 +25,26 @@
ORD_z = 0x7A


def only_hex_pairs(value: str) -> Union[re.Match[bytes], re.Match[str], None]:
def get_config_dir(unique_software_name: str):
"""Get a configuration directory for any program
(In the parent directory recommended by the specific platform).
"""
CONFIGS = os.path.expanduser("~/.config")
if platform.system() == "Darwin":
CONFIGS = os.path.expanduser("~/Library/Application Support")
elif platform.system() == "Windows":
CONFIGS = os.environ['APPDATA']
return os.path.join(CONFIGS, unique_software_name)


def only_hex_pairs(value: str):
# type: (str) -> Union[re.Match[bytes], re.Match[str], None]
"""Check if string contains only machine-readable hex pairs.
See openlcb.conventions submodule for LCC ID dot notation
functions (less restrictive).
"""
# ^ PEP8 (instead of Python) type hint is used to avoid
# "TypeError: 'type' object is not subscriptable"
if isinstance(value, (bytearray, bytes)):
return hex_pairs_brc.fullmatch(value)
assert isinstance(value, str)
Expand All @@ -36,6 +53,8 @@ def only_hex_pairs(value: str) -> Union[re.Match[bytes], re.Match[str], None]:

def emit_cast(value) -> str:
"""Get type and value, such as for debug output."""
if value is None:
return "None"
repr_str = repr(value)
if isinstance(value, Enum):
repr_str = "{}".format(value.value)
Expand Down Expand Up @@ -131,3 +150,26 @@ def from_hex_bytes(b: bytearray, start: int, stop: int,

def from_all_hex_bytes(b: bytearray) -> bytearray:
return from_hex_bytes(b, 0, len(b))


def hr_repr(value, always_quote: bool = False) -> str:
"""Represent value with double quotes
if str, otherwise as an unquoted string.
(Human-readable repr).
"""
# repr_value = repr(value)
# repr_value = repr_value.replace("\\\\", "\\")
# if repr_value.startswith("'") and repr_value.endswith("'"):
# return '"' + repr_value[1:-1].replace('"', '\\"') + '"'
repr_value = str(value)
if always_quote or isinstance(value, str):
return '"' + repr_value.replace('"', '\\"') + '"'
return repr(value)


def d_quote(value) -> str:
"""Represent any type of value in double-quotes
(with any already present escaped) such as for emitting XML
attribute debug messages or any other technical/literary use.
"""
return hr_repr(value, always_quote=True)
Loading
Loading