From d113597e81cec987c0a962961823d47784013561 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 11:16:53 -0500 Subject: [PATCH 1/6] added CAN APIs --- examples/can.py | 77 +++++++++++ freewili/fw.py | 304 ++++++++++++++++++++++++++++++++++++++++- freewili/fw_serial.py | 267 ++++++++++++++++++++++++++++++++++++ freewili/serialport.py | 2 +- freewili/types.py | 95 ++++++++++++- 5 files changed, 742 insertions(+), 3 deletions(-) create mode 100644 examples/can.py diff --git a/examples/can.py b/examples/can.py new file mode 100644 index 0000000..fcb7f51 --- /dev/null +++ b/examples/can.py @@ -0,0 +1,77 @@ +"""Example script to handle CAN from FreeWili with a Neptune Orca.""" + +import time + +from freewili import FreeWili +from freewili.framing import ResponseFrame +from freewili.types import CANData, EventDataType, EventType + + +def event_handler(event_type: EventType, frame: ResponseFrame, data: EventDataType) -> None: + """Handle events from FreeWili.""" + match event_type: + case EventType.CANRX0: + data: CANData = data # type:ignore + print(f"CAN0 RX Event: {data}") + case EventType.CANRX1: + data: CANData = data # type: ignore + print(f"CAN1 RX Event: {data}") + case EventType.CANTX0: + data: CANData = data # type: ignore + print(f"CAN0 TX Event: {data}") + case EventType.CANTX1: + data: CANData = data # type: ignore + print(f"CAN1 TX Event: {data}") + case _: + # Handle other event types as needed + if data: + print(f"{event_type}: Event Data: {data}") + else: + print(f"No data for this {event_type}.") + + +with FreeWili.find_first().expect("Failed to find FreeWili") as fw: + fw: FreeWili = fw # type: ignore + print(f"Connected to FreeWili {fw}") + + # v87 firmware currently just returns "Ok" for CAN register reads + # print(fw.can_read_registers(0, 0, 10).expect("Failed to read CAN0 registers")) + # fw.can_write_registers(0, 0, 4, 0x90770).expect("Failed to write CAN0 registers") + + fw.set_event_callback(event_handler) + # Enable CAN events + for channel in (0, 1): + fw.can_enable_streaming(channel, True).expect("Failed to enable CAN streaming") + # Set up a periodic CAN message on channel 0, every 100 ms + fw.can_set_transmit_periodic( + 0, 0, 100_000, 0x55, True, True, bytes([0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88]) + ).expect("Failed to set periodic CAN message on channel 0") + # Enable the periodic CAN message, this is redundant as can_set_transmit_periodic enables it by default + fw.can_enable_transmit_periodic(0, True).expect("Failed to enable periodic CAN message on channel 0") + + # Setup CAN RX filters + fw.can_set_rx_filter(0, 0, True, 0xFF, 0x123, 0, 0, 0, 0).expect("Failed to set CAN0 RX filter") + + print("Listening for events...") + start = time.time() + while True: + elapsed = time.time() - start + try: + fw.process_events() + if elapsed >= 1.0: + start = time.time() + # Transmit CAN messages every 1 second + fw.can_transmit(0, 0x123, bytes([0x11, 0x22, 0x33, 0x44]), True, True).expect( + "Failed to send CAN message on channel 0" + ) + fw.can_transmit(1, 0x124, bytes([0x11, 0x22, 0x33, 0x44]), True, True).expect( + "Failed to send CAN message on channel 1" + ) + except KeyboardInterrupt: + break + # Disable events before exiting + print("Disabling CAN events...") + for channel in (0, 1): + fw.can_enable_streaming(channel, False).expect("Failed to disable CAN streaming") + fw.can_enable_transmit_periodic(0, False).expect("Failed to disable periodic CAN message on channel 0") + print("Exiting event loop") diff --git a/freewili/fw.py b/freewili/fw.py index 9a481a8..d4707b0 100644 --- a/freewili/fw.py +++ b/freewili/fw.py @@ -1665,7 +1665,10 @@ def wileye_start_recording_video( Arguments: ---------- destination: int - Destination processor (0 = WILEye's SDCard, 1 = FREE-WILi's Main Filesystem, 2 = FREE-WILi's Display Filesystem) + Destination processor + 0 = WILEye's SDCard + 1 = FREE-WILi's Main Filesystem + 2 = FREE-WILi's Display Filesystem filename: str Name of the file to save the video as processor: FreeWiliProcessorType @@ -1900,6 +1903,305 @@ def wileye_set_resolution( case _: raise RuntimeError("Missing case statement") + def can_transmit( + self, + channel: int, + can_id: int, + data: bytes, + is_extended: bool, + is_fd: bool, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Transmit a CAN or CAN FD frame. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + can_id: int + CAN ID (11-bit for standard, 29-bit for extended) + data: bytes | tuple[int, ...] + Data payload (0-64 bytes for CAN FD, 0-8 bytes for standard CAN) + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + is_fd: bool + True if using CAN FD, False for standard CAN + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_transmit(channel, can_id, data, is_extended, is_fd) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_enable_transmit_periodic( + self, + index: int, + enabled: bool, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Enable/Disable periodic transmission of a CAN or CAN FD frame. + + Arguments: + ---------- + index: int + Index of the periodic frame to enable + enabled: bool + True to enable periodic transmission, False to disable + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_enable_transmit_periodic(index, enabled) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_set_transmit_periodic( + self, + channel: int, + index: int, + period_us: int, + arb_id: int, + is_fd: bool, + is_extended: bool, + data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Set a periodic CAN or CAN FD frame. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + Index of the periodic frame to set + period_us: int + Period in microseconds. As of v87 firmware, minimum is 500 us. + arb_id: int + CAN Arbitration ID + is_fd: bool + True if using CAN FD, False for standard CAN + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + data: bytes + Data payload (0-64 bytes for CAN FD, 0-8 bytes for standard CAN) + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_set_transmit_periodic(channel, index, period_us, arb_id, is_fd, is_extended, data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_enable_streaming( + self, + channel: int, + enabled: bool, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Enable/Disable CAN or CAN FD frame streaming. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + enabled: bool + True to enable streaming, False to disable + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_enable_streaming(channel, enabled) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_set_rx_filter( + self, + channel: int, + index: int, + is_extended: bool, + mask_id: int, + id: int | None, + mask_b0: int, + b0: int, + mask_b1: int, + b1: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Set a CAN or CAN FD filter. + + See can_enable_rx_filter to enable/disable the filter. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + Index of the filter (0-31) + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + mask_id: int + CAN ID mask (11-bit for standard, 29-bit for extended) + id: int | None + ID to filter on + mask_b0: int + Mask byte 0 + b0: int + Byte 0 + mask_b1: int + Mask byte 1 + b1: int + Byte 1 + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_set_rx_filter(channel, index, is_extended, mask_id, id, mask_b0, b0, mask_b1, b1) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_enable_rx_filter( + self, + channel: int, + index: int, + enable: bool, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Enable or disable a CAN RX filter. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + Index of the filter (0-31) + enable: bool + True to enable the filter, False to disable + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_enable_rx_filter(channel, index, enable) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_read_registers( + self, + channel: int, + address: int, + wordcount: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Read CAN registers. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + address: int + Register address (hex) + wordcount: int + Number of words to read + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_read_registers(channel, address, wordcount) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def can_write_registers( + self, + channel: int, + address: int, + bytesize: int, + word: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Write CAN registers. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + address: int + Register address (hex) + bytesize: int + Bytes per word (1 or 4) + word: int + Word to write (hex) + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.can_write_registers(channel, address, bytesize, word) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + @dataclass(frozen=True) class FileMap: diff --git a/freewili/fw_serial.py b/freewili/fw_serial.py index 66698fa..1d0299a 100644 --- a/freewili/fw_serial.py +++ b/freewili/fw_serial.py @@ -2067,3 +2067,270 @@ def wileye_set_resolution(self, resolution_index: int) -> Result[str, str]: self.serial_port.send(cmd) return self._handle_final_response_frame() + + @needs_open() + def can_transmit(self, channel: int, can_id: int, data: bytes, is_extended: bool, is_fd: bool) -> Result[str, str]: + """Transmit a CAN or CAN FD frame. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + can_id: int + CAN ID (11-bit for standard, 29-bit for extended) + data: bytes + Data payload (0-64 bytes for CAN FD, 0-8 bytes for standard CAN) + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + is_fd: bool + True if using CAN FD, False for standard CAN + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e, f w Channel ArbID (hex) isCANFD isXtd Bytes (hex) + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in data) + cmd = f"e\\f\\w {channel} {can_id:02X} {1 if is_fd else 0} {1 if is_extended else 0} {data_bytes}\n" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_enable_transmit_periodic(self, index: int, enabled: bool) -> Result[str, str]: + """Enable/Disable periodic transmission of a CAN or CAN FD frame. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + Index of the periodic frame to enable + enabled: bool + True to enable periodic transmission, False to disable + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e, f, p index enable period (us) Channel ArbID (hex) isCANFD isXtd Bytes (hex) + self._empty_all() + cmd = f"e\\f\\p {index} {1 if enabled else 0}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_set_transmit_periodic( + self, + channel: int, + index: int, + period_us: int, + arb_id: int, + is_fd: bool, + is_extended: bool, + data: bytes, + ) -> Result[str, str]: + """Transmit a periodic CAN or CAN FD frame. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + Index of the periodic frame to set + period_us: int + Period in microseconds. As of v87 firmware, minimum is 500 us. + arb_id: int + CAN Arbitration ID + is_fd: bool + True if using CAN FD, False for standard CAN + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + data: bytes | tuple[int, ...] + Data payload (0-64 bytes for CAN FD, 0-8 bytes for standard CAN) + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e, f, p index enable period (us) Channel ArbID (hex) isCANFD isXtd Bytes (hex) + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in data) + cmd = f"e\\f\\p {index} 1 {period_us} {channel} " + cmd += f"{arb_id:02X} {1 if is_fd else 0} {1 if is_extended else 0} {data_bytes}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_enable_streaming(self, channel: int, enabled: bool) -> Result[str, str]: + """Enable/Disable CAN or CAN FD frame streaming. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + enabled: bool + True to enable streaming, False to disable + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e f o Channel enable + self._empty_all() + cmd = f"e\\f\\o {channel} {1 if enabled else 0}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_set_rx_filter( + self, + channel: int, + index: int, + is_extended: bool, + mask_id: int, + id: int | None, + mask_b0: int, + b0: int, + mask_b1: int, + b1: int, + ) -> Result[str, str]: + """Set a CAN or CAN FD filter. + + See can_enable_rx_filter to enable/disable the filter. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + index of the filter (0-31) + is_extended: bool + True if using extended CAN ID (29-bit), False for standard (11-bit) + mask_id: int + CAN ID (11-bit for standard, 29-bit for extended) + id: int + ID to filter on + mask_b0: int + Mask byte 0 + b0: int + Byte 0 + mask_b1: int + Mask byte 1 + b1: int + Byte 1 + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e f f channel (0-1), index (0-32), enable, isXTD, mskID, ID, [mskb0, b0, mskb1, b1] + # 0 0 0 1 1 1 1 1 1 + self._empty_all() + cmd = f"e\\f\\f {channel} {index} 1 {1 if is_extended else 0} {mask_id:02X} {id:02X} " + cmd += f" {mask_b0:02X} {b0:02X} {mask_b1:02X} {b1:02X}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_enable_rx_filter( + self, + channel: int, + index: int, + enable: bool, + ) -> Result[str, str]: + """Enable or disable a CAN RX filter. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + index: int + index of the filter (0-31) + enable: bool + True to enable the filter, False to disable + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e f f channel (0-1) index (0-32) enable isXTD mskID ID (opt) mskb0 b0 mskb1 b1 + self._empty_all() + cmd = f"e\\f\\f {channel} {index} {1 if enable else 0}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_read_registers( + self, + channel: int, + address: int, + wordcount: int, + ) -> Result[str, str]: + """Read CAN registers. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + address: int + Register address (hex) + wordcount: int + Number of words to read + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e f r channel (0-1) address (hex) wordcount + self._empty_all() + cmd = f"e\\f\\r {channel} {address:02X} {wordcount}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() + + @needs_open() + def can_write_registers( + self, + channel: int, + address: int, + bytesize: int, + word: int, + ) -> Result[str, str]: + """Write CAN registers. + + Arguments: + ---------- + channel: int + CAN channel (0 or 1) + address: int + Register address (hex) + bytesize: int + Bytes per word (1 or 4) + word: int + Word to write (hex) + + Returns: + -------- + Result[None, str]: + Ok(None) if the command was sent successfully, Err(str) if not. + """ + # e f s channel (0-1) address (hex) bytesize (1,4) word (hex) + self._empty_all() + cmd = f"e\\f\\s {channel} {address:02X} {bytesize} {word:02X}" + self.serial_port.send(cmd) + + return self._handle_final_response_frame() diff --git a/freewili/serialport.py b/freewili/serialport.py index 64bd4f3..fef69d5 100644 --- a/freewili/serialport.py +++ b/freewili/serialport.py @@ -393,7 +393,7 @@ def send( data: bytes | str, append_newline: bool = True, newline_chars: str = "\n", - delay_sec: float = 0.005, + delay_sec: float = 0.000, wait: bool = True, ) -> None: r"""Send data to the serial port. diff --git a/freewili/types.py b/freewili/types.py index 4b85312..4fe6cdb 100644 --- a/freewili/types.py +++ b/freewili/types.py @@ -366,10 +366,83 @@ def from_string(cls, data: str) -> Self: ) +@dataclass(frozen=True) +class CANData(EventData): + """CAN event data from Free-Wili Neptune.""" + + # w + # Channel ArbID (hex) isCANFD isXtd Bytes (hex) + + # 0 9 1 1 01 02 03 + # [e\f\w 0D4B2535E0F2EED0 28 Ok 1] + # [*can1 0D4B2535E108BCD8 29 9x 01 02 03 1] + # [*canTx0 0D4B2535E1354348 30 9x 01 02 03 1] + + # w + # Channel ArbID (hex) isCANFD isXtd Bytes (hex) + + # 0 9 1 0 01 02 03 + # [e\f\w 0D4B253EF45EF028 31 Ok 1] + # [*can1 0D4B253EF47304C8 32 9 01 02 03 1] + # [*canTx0 0D4B253EF4BCEEA8 33 9 01 02 03 1] + + # w + # Channel ArbID (hex) isCANFD isXtd Bytes (hex) + + # 0 9 0 0 01 02 03 + # [e\f\w 0D4B254578129FE0 34 Ok 1] + # [*can1 0D4B254578266E30 35 9 01 02 03 1] + # [*canTx0 0D4B254578321A78 36 9 01 02 03 1] + # CAN Arbitration ID + arb_id: int + # Indicates if using extended CAN ID (29-bit) or standard (11-bit) + is_extended: bool + # Data payload + data: bytes + + @classmethod + def from_string(cls, data: str) -> Self: + """Convert a string to a CANData object. + + Arguments: + ---------- + data: str + The string to convert, typically from a CAN event. + + Returns: + -------- + CANData: + The converted CANData object. + """ + # [*can1 0D4B254578266E30 35 9 01 02 03 1] + # [*canTx0 0D4B254578321A78 36 9 01 02 03 1] + # [*can1 0D4B25E7F3A694C0 44 9 1] + # [*canTx0 0D4B25E7F3B1B850 45 9 1] + parts = data.split(" ") + arb_id = int(parts[0].strip("x"), 16) + is_extended = parts[0].lower().endswith("x") + data_bytes = bytes([int(x, 16) for x in parts[1:]]) + return cls( + arb_id=arb_id, + is_extended=is_extended, + data=data_bytes, + ) + + # Type alias for all possible event data types # This allows us to use EventDataType in type hints and function signatures EventDataType = ( - EventData | RawData | Radio1Data | Radio2Data | UART1Data | GPIOData | AccelData | ButtonData | IRData | BatteryData + EventData + | RawData + | Radio1Data + | Radio2Data + | UART1Data + | GPIOData + | AccelData + | ButtonData + | IRData + | BatteryData + | CANData ) @@ -387,6 +460,10 @@ class EventType(enum.Enum): Radio2 = enum.auto() UART1 = enum.auto() Audio = enum.auto() + CANTX0 = enum.auto() + CANRX0 = enum.auto() + CANTX1 = enum.auto() + CANRX1 = enum.auto() def __str__(self) -> str: return self.name @@ -430,6 +507,14 @@ def get_data_type(self) -> EventData: # type: ignore[return-value] return UART1Data # type: ignore[return-value] case self.Audio: return AudioData # type: ignore[return-value] + case self.CANTX0: + return CANData # type: ignore[return-value] + case self.CANRX0: + return CANData # type: ignore[return-value] + case self.CANTX1: + return CANData # type: ignore[return-value] + case self.CANRX1: + return CANData # type: ignore[return-value] case _: return RawData # type: ignore[return-value] @@ -473,6 +558,14 @@ def from_string(cls, value: str) -> Self: return cls(cls.UART1) case "audio": return cls(cls.Audio) + case "cantx0": + return cls(cls.CANTX0) + case "can0": + return cls(cls.CANRX0) + case "cantx1": + return cls(cls.CANTX1) + case "can1": + return cls(cls.CANRX1) case _: return cls(cls.Unknown) From 312e8ba33b40984a5b220ea517ab73b889657c9c Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 12:46:11 -0500 Subject: [PATCH 2/6] added missing examples --- doc/source/examples.rst | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 1d34e33..134ff68 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -173,3 +173,30 @@ Recursively explore and list the filesystem contents on both Display and Main pr .. literalinclude:: ../../examples/filesystem.py :language: python :linenos: + +Download Files +-------------- + +Download files from the FreeWili device to the local filesystem with progress tracking. + +.. literalinclude:: ../../examples/get_file.py + :language: python + :linenos: + +CAN Communication +----------------- + +Handle CAN and CAN FD communication events from FreeWili devices including transmit and receive on both CAN channels. + +.. literalinclude:: ../../examples/can.py + :language: python + :linenos: + +WilEye Camera +------------- + +Simple example demonstrating WilEye camera commands and automatic device discovery. + +.. literalinclude:: ../../examples/wileye_simple.py + :language: python + :linenos: From 012adb26d5d81087fc70c606c31b73435128c52d Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 13:35:40 -0500 Subject: [PATCH 3/6] fixed i2c test failure with neptune connected --- tests/test_i2c.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_i2c.py b/tests/test_i2c.py index 994c801..79337d8 100644 --- a/tests/test_i2c.py +++ b/tests/test_i2c.py @@ -1,4 +1,5 @@ """Test I2C functionality on a FreeWili.""" +from tkinter import N import pytest @@ -57,6 +58,9 @@ def test_hw_i2c_sparkfun_9dof_imu_breakout() -> None: # This is a workaround for firmware bug in v49. Issue #13 if len(addresses) == 1 and 0x20 in addresses: raise NoI2CHardwareError(f"Poll found {len(addresses)} I2C devices") + # Neptune + if 0x48 in addresses: + raise NoI2CHardwareError(f"Poll found {len(addresses)} I2C devices") assert MMC5983MA_ADDR in addresses, f"Expected I2C address {MMC5983MA_ADDR} not found. Got {addresses}!" assert ISM330DHCX_ADDR in addresses, f"Expected I2C address {ISM330DHCX_ADDR} not found. Got {addresses}!" From 05c11c2fff293f69a964a0fa6d3309125cba5d19 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 13:36:02 -0500 Subject: [PATCH 4/6] fixed file extensions --- freewili/fw.py | 1 + tests/test_fw.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/freewili/fw.py b/freewili/fw.py index d4707b0..038efe2 100644 --- a/freewili/fw.py +++ b/freewili/fw.py @@ -2243,6 +2243,7 @@ def from_ext(cls, ext: str) -> Self: "sub": (FreeWiliProcessorType.Main, "/radio", "Radio file"), "fwi": (FreeWiliProcessorType.Display, "/images", "Image file"), "wav": (FreeWiliProcessorType.Display, "/sounds", "Audio file"), + "py": (FreeWiliProcessorType.Main, "/scripts", "rthon script"), } if ext not in mappings: raise ValueError(f"Extension '{ext}' is not a known FreeWili file type") diff --git a/tests/test_fw.py b/tests/test_fw.py index e51cb9d..33c3e4c 100644 --- a/tests/test_fw.py +++ b/tests/test_fw.py @@ -15,8 +15,12 @@ def test_file_mappings() -> None: known_maps = { "wasm": (FreeWiliProcessorType.Main, "/scripts", "WASM binary"), "wsm": (FreeWiliProcessorType.Main, "/scripts", "WASM binary"), + "zio": (FreeWiliProcessorType.Main, "/scripts", "ZoomIO script file"), + "bin": (FreeWiliProcessorType.Main, "/fpga", "FPGA bin file"), "sub": (FreeWiliProcessorType.Main, "/radio", "Radio file"), "fwi": (FreeWiliProcessorType.Display, "/images", "Image file"), + "wav": (FreeWiliProcessorType.Display, "/sounds", "Audio file"), + "py": (FreeWiliProcessorType.Main, "/scripts", "rthon script"), } for ext, values in known_maps.items(): From a5dcc2849dda3aadfe62404f5ae9f85ebdcfab77 Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 13:56:54 -0500 Subject: [PATCH 5/6] added unit tests for CAN --- freewili/fw_serial.py | 7 +- freewili/types.py | 5 +- tests/test_can.py | 272 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 280 insertions(+), 4 deletions(-) create mode 100644 tests/test_can.py diff --git a/freewili/fw_serial.py b/freewili/fw_serial.py index 1d0299a..9b31967 100644 --- a/freewili/fw_serial.py +++ b/freewili/fw_serial.py @@ -2266,9 +2266,10 @@ def can_enable_rx_filter( Ok(None) if the command was sent successfully, Err(str) if not. """ # e f f channel (0-1) index (0-32) enable isXTD mskID ID (opt) mskb0 b0 mskb1 b1 - self._empty_all() - cmd = f"e\\f\\f {channel} {index} {1 if enable else 0}" - self.serial_port.send(cmd) + raise RuntimeError("TODO: not implemented") + # self._empty_all() + # cmd = f"e\\f\\f {channel} {index} {1 if enable else 0}" + # self.serial_port.send(cmd) return self._handle_final_response_frame() diff --git a/freewili/types.py b/freewili/types.py index 4fe6cdb..30f573a 100644 --- a/freewili/types.py +++ b/freewili/types.py @@ -421,7 +421,10 @@ def from_string(cls, data: str) -> Self: parts = data.split(" ") arb_id = int(parts[0].strip("x"), 16) is_extended = parts[0].lower().endswith("x") - data_bytes = bytes([int(x, 16) for x in parts[1:]]) + try: + data_bytes = bytes([int(x, 16) for x in parts[1:]]) + except ValueError: + data_bytes = bytes() return cls( arb_id=arb_id, is_extended=is_extended, diff --git a/tests/test_can.py b/tests/test_can.py new file mode 100644 index 0000000..7b80071 --- /dev/null +++ b/tests/test_can.py @@ -0,0 +1,272 @@ +"""Test CAN functionality on a FreeWili with Neptune Orca hardware.""" + +import time + +import pytest + +from freewili import FreeWili +from freewili.framing import ResponseFrame +from freewili.types import CANData, EventDataType, EventType + + +class NoCANHardwareError(Exception): + """Exception to raise when no CAN hardware (Neptune) was found.""" + + pass + + +class CANHardwareFoundError(Exception): + """Exception to raise when CAN hardware (Neptune) was found.""" + + pass + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=CANHardwareFoundError) +def test_hw_can_nothing_attached() -> None: + """Test CAN on a FreeWili with no Neptune hardware attached.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Test Polling - if we find 0x48, that means Neptune is attached + addresses = device.poll_i2c().expect("Failed to poll i2c") + if 0x48 in addresses: + raise CANHardwareFoundError("Found Neptune hardware (I2C address 0x48)") + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_neptune_basic() -> None: + """Test basic CAN functionality on a FreeWili with Neptune Orca hardware. + + This test requires Neptune Orca hardware which provides CAN functionality. + Neptune is identified by I2C address 0x48. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Test Polling - verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Test enabling CAN streaming on both channels + for channel in (0, 1): + result = device.can_enable_streaming(channel, True).expect( + f"Failed to enable CAN streaming on channel {channel}" + ) + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # Test disabling CAN streaming + for channel in (0, 1): + result = device.can_enable_streaming(channel, False).expect( + f"Failed to disable CAN streaming on channel {channel}" + ) + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_neptune_transmit() -> None: + """Test CAN transmit functionality on a FreeWili with Neptune Orca hardware. + + This test requires Neptune Orca hardware which provides CAN functionality. + Tests basic CAN message transmission on both channels. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Test CAN transmit on channel 0 + result = device.can_transmit( + 0, # channel + 0x123, # arb_id + bytes([0x11, 0x22, 0x33, 0x44]), # data + True, # is_canfd + True, # is_extended + ).expect("Failed to transmit CAN message on channel 0") + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # Test CAN transmit on channel 1 + result = device.can_transmit( + 1, # channel + 0x124, # arb_id + bytes([0x55, 0x66, 0x77, 0x88]), # data + True, # is_canfd + False, # is_extended (standard ID) + ).expect("Failed to transmit CAN message on channel 1") + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_neptune_periodic_transmit() -> None: + """Test CAN periodic transmit functionality on a FreeWili with Neptune Orca hardware. + + This test sets up and controls periodic CAN message transmission. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Set up a periodic CAN message on channel 0, every 100 ms + result = device.can_set_transmit_periodic( + 0, # channel + 0, # slot (0-7) + 100_000, # period_us (100ms) + 0x55, # arb_id + True, # is_canfd + True, # is_extended + bytes([0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88]), # data + ).expect("Failed to set periodic CAN message on channel 0") + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # Verify we can enable periodic transmission + result = device.can_enable_transmit_periodic(0, True).expect( + "Failed to enable periodic CAN message on channel 0" + ) + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # Disable periodic transmission + result = device.can_enable_transmit_periodic(0, False).expect( + "Failed to disable periodic CAN message on channel 0" + ) + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_neptune_rx_filter() -> None: + """Test CAN RX filter functionality on a FreeWili with Neptune Orca hardware. + + This test configures CAN receive filters to selectively receive messages. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Set up CAN RX filter on channel 0 + result = device.can_set_rx_filter( + 0, # channel + 0, # filter_num + True, # is_extended + 0xFF, # match_priority + 0x123, # match_id + 0, # match_data1 + 0, # match_data2 + 0, # mask_id + 0, # mask_data + ).expect("Failed to set CAN RX filter on channel 0") + assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # # Test enabling RX filter + # result = device.can_enable_rx_filter(0, 0, True).expect("Failed to enable CAN RX filter on channel 0") + # assert result == "Ok", f"Expected 'Ok', got '{result}'" + + # # Test disabling RX filter + # result = device.can_enable_rx_filter(0, 0, False).expect("Failed to disable CAN RX filter on channel 0") + # assert result == "Ok", f"Expected 'Ok', got '{result}'" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_neptune_events() -> None: + """Test CAN event handling on a FreeWili with Neptune Orca hardware. + + This test verifies that CAN events can be received and processed correctly. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Track received events + received_events: dict[EventType, list[EventDataType]] = { + EventType.CANRX0: [], + EventType.CANRX1: [], + EventType.CANTX0: [], + EventType.CANTX1: [], + } + + def event_handler(event_type: EventType, frame: ResponseFrame, data: EventDataType) -> None: + """Handle events from FreeWili.""" + if event_type in received_events: + received_events[event_type].append(data) + + # Set up event callback + device.set_event_callback(event_handler) + + # Enable CAN streaming on both channels + for channel in (0, 1): + device.can_enable_streaming(channel, True).expect(f"Failed to enable CAN streaming on channel {channel}") + + # Send a test message + device.can_transmit(0, 0x123, bytes([0xAA, 0xBB, 0xCC, 0xDD]), True, True).expect( + "Failed to send CAN message on channel 0" + ) + + # Process events for a short time to see if we receive anything + start = time.time() + while time.time() - start < 0.5: # Process for 500ms + device.process_events() + time.sleep(0.01) + + # Disable streaming + for channel in (0, 1): + device.can_enable_streaming(channel, False).expect(f"Failed to disable CAN streaming on channel {channel}") + + # Note: We don't assert specific events were received because this depends on + # the actual CAN bus configuration and hardware setup. The test passes if + # the event handling infrastructure works without errors. + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoCANHardwareError) +def test_hw_can_data_parsing() -> None: + """Test CANData parsing from string format. + + This test verifies the CANData.from_string() method correctly parses CAN event data. + """ + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Verify Neptune hardware is present + addresses = device.poll_i2c().expect("Failed to poll I2C") + if 0x48 not in addresses: + raise NoCANHardwareError("Neptune hardware not found (no I2C address 0x48)") + + # Test parsing extended ID with data + can_data = CANData.from_string("9x 01 02 03") + assert can_data.arb_id == 0x9 + assert can_data.is_extended is True + assert can_data.data == bytes([0x01, 0x02, 0x03]) + + # Test parsing standard ID with data + can_data = CANData.from_string("9 01 02 03") + assert can_data.arb_id == 0x9 + assert can_data.is_extended is False + assert can_data.data == bytes([0x01, 0x02, 0x03]) + + # Test parsing with no data + can_data = CANData.from_string("9 ") + assert can_data.arb_id == 0x9 + assert can_data.is_extended is False + assert can_data.data == bytes([]) + + # Test parsing extended ID with no data + can_data = CANData.from_string("123x ") + assert can_data.arb_id == 0x123 + assert can_data.is_extended is True + assert can_data.data == bytes([]) + + +if __name__ == "__main__": + import pytest + + pytest.main( + args=[ + __file__, + "--verbose", + ] + ) From 71f269cff47831a96fc6922813cbf406caaca47c Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Tue, 27 Jan 2026 14:09:10 -0500 Subject: [PATCH 6/6] removed wrong import --- tests/test_i2c.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_i2c.py b/tests/test_i2c.py index 79337d8..48443a4 100644 --- a/tests/test_i2c.py +++ b/tests/test_i2c.py @@ -1,5 +1,4 @@ """Test I2C functionality on a FreeWili.""" -from tkinter import N import pytest