From 8423c5e0f5718098b496ccc1818e76b41d6d189f Mon Sep 17 00:00:00 2001 From: David Rebbe Date: Mon, 23 Mar 2026 13:57:58 -0400 Subject: [PATCH] implemented MDIO --- freewili/fw.py | 473 ++++++++++++++++++++++++++++++++++++++++++ freewili/fw_serial.py | 346 ++++++++++++++++++++++++++++++ tests/test_hw_mdio.py | 212 +++++++++++++++++++ 3 files changed, 1031 insertions(+) create mode 100644 tests/test_hw_mdio.py diff --git a/freewili/fw.py b/freewili/fw.py index cf8d767..4db29ba 100644 --- a/freewili/fw.py +++ b/freewili/fw.py @@ -2293,6 +2293,479 @@ def can_write_registers( case _: raise RuntimeError("Missing case statement") + # MDIO Commands + # Hardware Setup: Connect MDC to pin 17 and MDIO to pin 14 + + def mdio_poll_sfp( + self, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Poll for SFP Modules on the I2C bus. + + If a module is found, reads the PHY's temperature and Signal Quality Indicator (SQI). + + Arguments: + ---------- + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_poll_sfp() + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_sfp( + self, + device_address: int, + register_address: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Read a 16-bit value from a register on an SFP device. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_address: bytes + 2-byte register address + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_sfp(device_address, register_address) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_write_sfp( + self, + device_address: int, + register_address_and_data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Write a 16-bit value to a register on an SFP device. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_address_and_data: bytes + 4 bytes: bytes 0-1 are the register address, bytes 2-3 are the data to write + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_write_sfp(device_address, register_address_and_data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_modify_write_sfp( + self, + device_address: int, + register_mask_data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a read-modify-write on an SFP register. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_mask_data: bytes + 6 bytes: bytes 0-1 are the register address, bytes 2-3 are the mask, + bytes 4-5 are the data + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_modify_write_sfp(device_address, register_mask_data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_poll_phy( + self, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Poll all 32 PHY addresses (0x00-0x1F) for MDIO devices. + + At each address, checks for Clause 22, Clause 45, and Clause 22 Access to Clause 45 + compatibility. + + Arguments: + ---------- + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_poll_phy() + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_22( + self, + phy_address: int, + register_address: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 read. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_22(phy_address, register_address) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_write_22( + self, + phy_address: int, + register_address: int, + data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 write. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + data: bytes + 2 bytes of data to write (big-endian: byte[0]<<8 | byte[1]) + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_write_22(phy_address, register_address, data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_modify_write_22( + self, + phy_address: int, + register_address: int, + mask_and_data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 read-modify-write. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_modify_write_22(phy_address, register_address, mask_and_data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_45( + self, + phy_address: int, + mmd_address: int, + register_address: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 45 read. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD (MDIO Manageable Device) address + register_address: int + 16-bit register address + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_45(phy_address, mmd_address, register_address) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_write_45( + self, + phy_address: int, + mmd_address: int, + register_address: int, + data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 45 write. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + data: bytes + 2 bytes of data to write + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_write_45(phy_address, mmd_address, register_address, data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_modify_write_45( + self, + phy_address: int, + mmd_address: int, + register_address: int, + mask_and_data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 45 read-modify-write. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_modify_write_45( + phy_address, mmd_address, register_address, mask_and_data + ) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_emu( + self, + phy_address: int, + mmd_address: int, + register_address: int, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) read. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_emu(phy_address, mmd_address, register_address) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_write_emu( + self, + phy_address: int, + mmd_address: int, + register_address: int, + data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) write. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + data: bytes + 2 bytes of data to write + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_write_emu(phy_address, mmd_address, register_address, data) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + + def mdio_read_modify_write_emu( + self, + phy_address: int, + mmd_address: int, + register_address: int, + mask_and_data: bytes, + processor: FreeWiliProcessorType = FreeWiliProcessorType.Main, + ) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) read-modify-write. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. 1-bits in the mask indicate which bits are + overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + processor: FreeWiliProcessorType + Processor to send the command to (default: Main) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + match self.get_serial_from(processor): + case Ok(serial): + return serial.mdio_read_modify_write_emu( + phy_address, mmd_address, register_address, mask_and_data + ) + case Err(msg): + return Err(msg) + case _: + raise RuntimeError("Missing case statement") + def get_app_info( self, processor: FreeWiliProcessorType = FreeWiliProcessorType.Main ) -> Result[FreeWiliAppInfo, str]: diff --git a/freewili/fw_serial.py b/freewili/fw_serial.py index 70ce6d2..54b25d1 100644 --- a/freewili/fw_serial.py +++ b/freewili/fw_serial.py @@ -2402,3 +2402,349 @@ def enable_nfc_read_events(self, enable: bool) -> Result[str, str]: cmd = f"n\nr\n{0 if not enable else 1}" self.serial_port.send(cmd) return self._handle_final_response_frame() + + # MDIO Commands + # Hardware Setup: Connect MDC to pin 17 and MDIO to pin 14 + + @needs_open() + def mdio_poll_sfp(self) -> Result[str, str]: + """Poll for SFP Modules on the I2C bus. + + If a module is found, reads the PHY's temperature and Signal Quality Indicator (SQI). + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + cmd = "e\\m\\a" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_sfp(self, device_address: int, register_address: bytes) -> Result[str, str]: + """Read a 16-bit value from a register on an SFP device. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_address: bytes + 2-byte register address + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + reg_bytes = " ".join(f"{i:02X}" for i in register_address) + cmd = f"e\\m\\b {device_address:02X} {reg_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_write_sfp(self, device_address: int, register_address_and_data: bytes) -> Result[str, str]: + """Write a 16-bit value to a register on an SFP device. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_address_and_data: bytes + 4 bytes: bytes 0-1 are the register address, bytes 2-3 are the data to write + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in register_address_and_data) + cmd = f"e\\m\\c {device_address:02X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_modify_write_sfp(self, device_address: int, register_mask_data: bytes) -> Result[str, str]: + """Perform a read-modify-write on an SFP register. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + device_address: int + 5-bit SFP device address (1 byte hex) + register_mask_data: bytes + 6 bytes: bytes 0-1 are the register address, bytes 2-3 are the mask, + bytes 4-5 are the data + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in register_mask_data) + cmd = f"e\\m\\e {device_address:02X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_poll_phy(self) -> Result[str, str]: + """Poll all 32 PHY addresses (0x00-0x1F) for MDIO devices. + + At each address, checks for Clause 22, Clause 45, and Clause 22 Access to Clause 45 + compatibility. + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + cmd = "e\\m\\y" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_22(self, phy_address: int, register_address: int) -> Result[str, str]: + """Perform a Clause 22 read. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + cmd = f"e\\m\\g {phy_address:02X} {register_address:02X}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_write_22(self, phy_address: int, register_address: int, data: bytes) -> Result[str, str]: + """Perform a Clause 22 write. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + data: bytes + 2 bytes of data to write (big-endian: byte[0]<<8 | byte[1]) + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in data) + cmd = f"e\\m\\i {phy_address:02X} {register_address:02X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_modify_write_22( + self, phy_address: int, register_address: int, mask_and_data: bytes + ) -> Result[str, str]: + """Perform a Clause 22 read-modify-write. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + register_address: int + 5-bit register address (0x00-0x1F) + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in mask_and_data) + cmd = f"e\\m\\j {phy_address:02X} {register_address:02X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_45(self, phy_address: int, mmd_address: int, register_address: int) -> Result[str, str]: + """Perform a Clause 45 read. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD (MDIO Manageable Device) address + register_address: int + 16-bit register address + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + cmd = f"e\\m\\k {phy_address:02X} {mmd_address:02X} {register_address:04X}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_write_45(self, phy_address: int, mmd_address: int, register_address: int, data: bytes) -> Result[str, str]: + """Perform a Clause 45 write. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + data: bytes + 2 bytes of data to write + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in data) + cmd = f"e\\m\\l {phy_address:02X} {mmd_address:02X} {register_address:04X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_modify_write_45( + self, phy_address: int, mmd_address: int, register_address: int, mask_and_data: bytes + ) -> Result[str, str]: + """Perform a Clause 45 read-modify-write. + + 1-bits in the mask indicate which bits are overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in mask_and_data) + cmd = f"e\\m\\m {phy_address:02X} {mmd_address:02X} {register_address:04X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_emu(self, phy_address: int, mmd_address: int, register_address: int) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) read. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + cmd = f"e\\m\\n {phy_address:02X} {mmd_address:02X} {register_address:04X}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_write_emu( + self, phy_address: int, mmd_address: int, register_address: int, data: bytes + ) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) write. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + data: bytes + 2 bytes of data to write + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in data) + cmd = f"e\\m\\o {phy_address:02X} {mmd_address:02X} {register_address:04X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() + + @needs_open() + def mdio_read_modify_write_emu( + self, phy_address: int, mmd_address: int, register_address: int, mask_and_data: bytes + ) -> Result[str, str]: + """Perform a Clause 22 Access to Clause 45 (emulation) read-modify-write. + + Uses Clause 22 frames to access Clause 45 register space via indirect MMD access + through Clause 22 registers 13/14. 1-bits in the mask indicate which bits are + overwritten with the corresponding data bits. + + Arguments: + ---------- + phy_address: int + 5-bit PHY address (0x00-0x1F) + mmd_address: int + 5-bit MMD address + register_address: int + 16-bit register address + mask_and_data: bytes + 4 bytes: bytes 0-1 are the mask, bytes 2-3 are the data + + Returns: + -------- + Result[str, str]: + Ok(str) if the command was sent successfully, Err(str) if not. + """ + self._empty_all() + data_bytes = " ".join(f"{i:02X}" for i in mask_and_data) + cmd = f"e\\m\\p {phy_address:02X} {mmd_address:02X} {register_address:04X} {data_bytes}" + self.serial_port.send(cmd) + return self._handle_final_response_frame() diff --git a/tests/test_hw_mdio.py b/tests/test_hw_mdio.py new file mode 100644 index 0000000..61435c8 --- /dev/null +++ b/tests/test_hw_mdio.py @@ -0,0 +1,212 @@ +"""Test MDIO functionality on a FreeWili. + +Hardware Setup: Connect MDC to pin 17 and MDIO to pin 14. +""" + +import pytest + +from freewili import FreeWili + + +class NoMDIOHardwareError(Exception): + """Exception to raise when no MDIO PHY hardware was found.""" + + pass + + +class MDIOHardwareFoundError(Exception): + """Exception to raise when MDIO PHY hardware was found.""" + + pass + + +# --- SFP Operations --- + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_poll_sfp() -> None: + """Test SFP polling on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + result = device.mdio_poll_sfp() + if result.is_err(): + raise NoMDIOHardwareError(f"SFP poll failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_sfp() -> None: + """Test SFP register read on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Read register 0x00FF on device address 0x1A + result = device.mdio_read_sfp(0x1A, b"\x00\xFF") + if result.is_err(): + raise NoMDIOHardwareError(f"SFP read failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_write_sfp() -> None: + """Test SFP register write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Write 0x1234 to register 0x00FF on device 0x1A + result = device.mdio_write_sfp(0x1A, b"\x00\xFF\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"SFP write failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_modify_write_sfp() -> None: + """Test SFP read-modify-write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # RMW register 0x00FF on device 0x1A, mask 0xFF00, data 0x1234 + result = device.mdio_read_modify_write_sfp(0x1A, b"\x00\xFF\xFF\x00\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"SFP RMW failed: {result.err()}") + assert result.ok() != "" + + +# --- PHY Address Discovery --- + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_poll_phy() -> None: + """Test PHY address polling on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + result = device.mdio_poll_phy() + if result.is_err(): + raise NoMDIOHardwareError(f"PHY poll failed: {result.err()}") + assert result.ok() != "" + + +# --- Clause 22 Operations --- + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_22() -> None: + """Test Clause 22 read on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Read register 0x02 on PHY 0x01 + result = device.mdio_read_22(0x01, 0x02) + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 22 read failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_write_22() -> None: + """Test Clause 22 write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Write 0x1234 to register 0x02 on PHY 0x01 + result = device.mdio_write_22(0x01, 0x02, b"\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 22 write failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_modify_write_22() -> None: + """Test Clause 22 read-modify-write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # RMW register 0x02 on PHY 0x01, mask 0xFF00, data 0x1234 + result = device.mdio_read_modify_write_22(0x01, 0x02, b"\xFF\x00\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 22 RMW failed: {result.err()}") + assert result.ok() != "" + + +# --- Clause 45 Operations --- + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_45() -> None: + """Test Clause 45 read on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Read register 0x0003 on MMD 0x01 on PHY 0x01 + result = device.mdio_read_45(0x01, 0x01, 0x0003) + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 45 read failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_write_45() -> None: + """Test Clause 45 write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Write 0xABCD to register 0x0003 on MMD 0x01 on PHY 0x01 + result = device.mdio_write_45(0x01, 0x01, 0x0003, b"\xAB\xCD") + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 45 write failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_modify_write_45() -> None: + """Test Clause 45 read-modify-write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # RMW register 0x0003 on MMD 0x01 on PHY 0x01, mask 0xFF00, data 0x1234 + result = device.mdio_read_modify_write_45(0x01, 0x01, 0x0003, b"\xFF\x00\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"Clause 45 RMW failed: {result.err()}") + assert result.ok() != "" + + +# --- Clause 22 Access to Clause 45 (Emulation) Operations --- + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_emu() -> None: + """Test Clause 22 Access to Clause 45 (emulation) read on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Read register 0x0003 on MMD 0x01 on PHY 0x01 + result = device.mdio_read_emu(0x01, 0x01, 0x0003) + if result.is_err(): + raise NoMDIOHardwareError(f"Emulation read failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_write_emu() -> None: + """Test Clause 22 Access to Clause 45 (emulation) write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # Write 0xABCD to register 0x0003 on MMD 0x01 on PHY 0x01 + result = device.mdio_write_emu(0x01, 0x01, 0x0003, b"\xAB\xCD") + if result.is_err(): + raise NoMDIOHardwareError(f"Emulation write failed: {result.err()}") + assert result.ok() != "" + + +@pytest.mark.skipif("len(FreeWili.find_all()) == 0") +@pytest.mark.xfail(raises=NoMDIOHardwareError) +def test_hw_mdio_read_modify_write_emu() -> None: + """Test Clause 22 Access to Clause 45 (emulation) read-modify-write on a FreeWili.""" + with FreeWili.find_first().expect("Failed to find a FreeWili") as device: + # RMW register 0x0003 on MMD 0x01 on PHY 0x01, mask 0xFF00, data 0x1234 + result = device.mdio_read_modify_write_emu(0x01, 0x01, 0x0003, b"\xFF\x00\x12\x34") + if result.is_err(): + raise NoMDIOHardwareError(f"Emulation RMW failed: {result.err()}") + assert result.ok() != "" + + +if __name__ == "__main__": + import pytest + + pytest.main( + args=[ + __file__, + "--verbose", + ] + )