From e85c5e64e70ebf4a81c5cdfcc89cf920d4ef47ef Mon Sep 17 00:00:00 2001 From: Samuel Abels Date: Sun, 14 Jun 2026 12:44:46 +0200 Subject: [PATCH] Add Galvo/EzCad2 driver support for galvo-based laser controllers Implements driver for EzCad2/LMC protocol used by galvo laser systems (e.g. AtomStack M4). Includes: - Full EzCad2 protocol constants and command structures - GalvoController for list building, single commands, and init sequence - GalvoEncoder to convert Ops to EzCad2 binary protocol - GalvoDriver implementing the Driver ABC with coordinate conversion - MockGalvoConnection for testing without hardware - GalvoUSBConnection via pyusb (VID 0x9588/0x9899) - 106 tests covering controller, encoder, driver, and mock Closes #261 --- debian/control | 3 +- pixi.lock | 7 + pixi.toml | 1 + rayforge/machine/driver/__init__.py | 2 + rayforge/machine/driver/galvo/__init__.py | 3 + .../machine/driver/galvo/galvo_connection.py | 25 + rayforge/machine/driver/galvo/galvo_consts.py | 241 +++++++++ .../machine/driver/galvo/galvo_controller.py | 461 ++++++++++++++++++ rayforge/machine/driver/galvo/galvo_driver.py | 402 +++++++++++++++ .../machine/driver/galvo/galvo_encoder.py | 252 ++++++++++ .../driver/galvo/galvo_mock_connection.py | 143 ++++++ .../machine/driver/galvo/galvo_protocol.py | 89 ++++ .../driver/galvo/galvo_usb_connection.py | 130 +++++ rayforge/ui_gtk/about.py | 1 + requirements.txt | 1 + scripts/win/win_setup.sh | 1 + tests/machine/driver/galvo/__init__.py | 0 .../driver/galvo/test_galvo_controller.py | 305 ++++++++++++ .../machine/driver/galvo/test_galvo_driver.py | 393 +++++++++++++++ .../driver/galvo/test_galvo_encoder.py | 320 ++++++++++++ .../galvo/test_galvo_mock_connection.py | 200 ++++++++ 21 files changed, 2979 insertions(+), 1 deletion(-) create mode 100644 rayforge/machine/driver/galvo/__init__.py create mode 100644 rayforge/machine/driver/galvo/galvo_connection.py create mode 100644 rayforge/machine/driver/galvo/galvo_consts.py create mode 100644 rayforge/machine/driver/galvo/galvo_controller.py create mode 100644 rayforge/machine/driver/galvo/galvo_driver.py create mode 100644 rayforge/machine/driver/galvo/galvo_encoder.py create mode 100644 rayforge/machine/driver/galvo/galvo_mock_connection.py create mode 100644 rayforge/machine/driver/galvo/galvo_protocol.py create mode 100644 rayforge/machine/driver/galvo/galvo_usb_connection.py create mode 100644 tests/machine/driver/galvo/__init__.py create mode 100644 tests/machine/driver/galvo/test_galvo_controller.py create mode 100644 tests/machine/driver/galvo/test_galvo_driver.py create mode 100644 tests/machine/driver/galvo/test_galvo_encoder.py create mode 100644 tests/machine/driver/galvo/test_galvo_mock_connection.py diff --git a/debian/control b/debian/control index 36a33e97b..6ccba7b11 100644 --- a/debian/control +++ b/debian/control @@ -28,7 +28,8 @@ Depends: ${shlibs:Depends}, ${misc:Depends}, ${python3:Depends}, python3-opencv, python3-ezdxf, python3-pluggy, python3-pypdf, python3-yaml, python3-aiohttp, python3-websockets, python3-blinker, python3-platformdirs, python3-opengl, desktop-file-utils, - python3-svgelements, python3-semver, python3-fitz, python3-serial + python3-svgelements, python3-semver, python3-fitz, python3-serial, + python3-usb Description: Desktop application for laser cutting and engraving Rayforge is a powerful, open-source, and cross-platform software for controlling your laser cutter and engraver. diff --git a/pixi.lock b/pixi.lock index bf203a946..2f17e7b59 100644 --- a/pixi.lock +++ b/pixi.lock @@ -377,6 +377,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/21/0e/8459ca4413e1a21a06c97d134bfaf18adfd27cea068813dc0faae06cbf00/cssselect2-0.9.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/22/8e/85e9d9f11dbf34036eb1df283805ef6b885f2005a56d6533bb58ab0b8a11/pymupdf-1.27.2.2-cp310-abi3-manylinux_2_28_x86_64.whl - pypi: https://files.pythonhosted.org/packages/27/1a/1f68f9ba0c207934b35b86a8ca3aad8395a3d6dd7921c0686e23853ff5a9/mccabe-0.7.0-py2.py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/28/b8/27e6312e86408a44fe16bd28ee12dd98608b39f7e7e57884a24e8f29b573/pyusb-1.3.1-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/2d/1c/317ccaaa6a3ddce270652781cf09d3b1d5d343b1a1dabca304fe5c218474/pygobject_stubs-2.16.0.tar.gz - pypi: https://files.pythonhosted.org/packages/32/80/c1b40335949fec3ca1ecea7ddb3f4977dfc104d84dc54e9384f7ccc52124/trimesh-4.6.8-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/33/6b/e0547afaf41bf2c42e52430072fa5658766e3d65bd4b03a563d1b6336f57/distlib-0.4.0-py2.py3-none-any.whl @@ -4731,6 +4732,7 @@ packages: - pyopengl==3.1.10 - pyopengl-accelerate==3.1.10 - pypdf~=6.10.0 + - pyusb>=1.2.0 - pyserial>=3.5 - pyvips==3.0.0 - pyyaml==6.0.2 @@ -4966,6 +4968,11 @@ packages: version: 0.7.0 sha256: 6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e requires_python: '>=3.6' +- pypi: https://files.pythonhosted.org/packages/28/b8/27e6312e86408a44fe16bd28ee12dd98608b39f7e7e57884a24e8f29b573/pyusb-1.3.1-py3-none-any.whl + name: pyusb + version: 1.3.1 + sha256: bf9b754557af4717fe80c2b07cc2b923a9151f5c08d17bdb5345dac09d6a0430 + requires_python: '>=3.9.0' - pypi: https://files.pythonhosted.org/packages/2d/1c/317ccaaa6a3ddce270652781cf09d3b1d5d343b1a1dabca304fe5c218474/pygobject_stubs-2.16.0.tar.gz name: pygobject-stubs version: 2.16.0 diff --git a/pixi.toml b/pixi.toml index 47b80ab49..377ae1f9a 100644 --- a/pixi.toml +++ b/pixi.toml @@ -47,6 +47,7 @@ platformdirs = "==4.3.6" pluggy = "==1.6.0" pypdf = "~=6.10.0" pymupdf = "==1.27.2.2" +pyusb = ">=1.2.0" pyserial = "~=3.5" pyvips = "==3.0.0" PyYAML = "==6.0.2" diff --git a/rayforge/machine/driver/__init__.py b/rayforge/machine/driver/__init__.py index 01f96b262..1c5dcc884 100644 --- a/rayforge/machine/driver/__init__.py +++ b/rayforge/machine/driver/__init__.py @@ -11,6 +11,7 @@ ) from .marlin import MarlinSerialDriver from .octoprint import OctoPrintDriver +from .galvo import GalvoDriver from .ruida import RuidaDriver from .smoothie import SmoothieDriver @@ -42,6 +43,7 @@ def register_driver(driver: Type[Driver]): "DriverMaturity", "DRIVER_MATURITY_LABELS", "NoDeviceDriver", + "GalvoDriver", "GrblNetworkDriver", "GrblSerialDriver", "GrblSerialSimpleDriver", diff --git a/rayforge/machine/driver/galvo/__init__.py b/rayforge/machine/driver/galvo/__init__.py new file mode 100644 index 000000000..4073992de --- /dev/null +++ b/rayforge/machine/driver/galvo/__init__.py @@ -0,0 +1,3 @@ +from .galvo_driver import GalvoDriver + +__all__ = ["GalvoDriver"] diff --git a/rayforge/machine/driver/galvo/galvo_connection.py b/rayforge/machine/driver/galvo/galvo_connection.py new file mode 100644 index 000000000..759e4ccda --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_connection.py @@ -0,0 +1,25 @@ +""" +Connection interface for EzCad2/LMC controllers. + +Defines a Protocol (structural typing) for USB-like connections +so both MockGalvoConnection and GalvoUSBConnection are interchangeable. +""" + +from typing import Optional, Protocol + + +class GalvoConnection(Protocol): + """Protocol for EzCad2 USB-like connections.""" + + @property + def is_connected(self) -> bool: ... + + def open(self, index: int = 0) -> int: ... + + def close(self, index: int = 0) -> None: ... + + def write( + self, index: int = 0, packet: Optional[bytes] = None + ) -> None: ... + + def read(self, index: int = 0) -> bytes: ... diff --git a/rayforge/machine/driver/galvo/galvo_consts.py b/rayforge/machine/driver/galvo/galvo_consts.py new file mode 100644 index 000000000..499fbdc3d --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_consts.py @@ -0,0 +1,241 @@ +""" +EzCad2/LMC protocol constants for Galvo laser controllers. + +Based on: +- https://github.com/meerk40t/meerk40t/tree/main/meerk40t/balormk +- https://github.com/meerk40t/galvoplotter +- LightBurn documentation +""" + +from typing import Dict + +# ============================================================================= +# USB identifiers +# ============================================================================= + +USB_VID_JCZ = 0x9588 +USB_PID_JCZ = 0x9899 +USB_VID_CH341 = 0x1A86 +USB_PID_CH341 = 0x5512 + +USB_WRITE_ENDPOINT = 0x02 +USB_READ_ENDPOINT = 0x88 + +# ============================================================================= +# List commands (0x8000+) +# ============================================================================= + +listJumpTo = 0x8001 +listEndOfList = 0x8002 +listLaserOnPoint = 0x8003 +listDelayTime = 0x8004 +listMarkTo = 0x8005 +listJumpSpeed = 0x8006 +listLaserOnDelay = 0x8007 +listLaserOffDelay = 0x8008 +listMarkFreq = 0x800A +listMarkPowerRatio = 0x800B +listMarkSpeed = 0x800C +listJumpDelay = 0x800D +listPolygonDelay = 0x800F +listWritePort = 0x8011 +listMarkCurrent = 0x8012 +listMarkFreq2 = 0x8013 +listFlyEnable = 0x801A +listQSwitchPeriod = 0x801B +listDirectLaserSwitch = 0x801C +listFlyDelay = 0x801D +listSetCo2FPK = 0x801E +listFlyWaitInput = 0x801F +listFiberOpenMO = 0x8021 +listWaitForInput = 0x8022 +listChangeMarkCount = 0x8023 +listSetWeldPowerWave = 0x8024 +listEnableWeldPowerWave = 0x8025 +listFiberYLPMPulseWidth = 0x8026 +listFlyEncoderCount = 0x8028 +listSetDaZWord = 0x8029 +listJptSetParam = 0x8050 +listReadyMark = 0x8051 + +# ============================================================================= +# Single commands (0x0000 - 0x7FFF) +# ============================================================================= + +DisableLaser = 0x0002 +EnableLaser = 0x0004 +ExecuteList = 0x0005 +SetPwmPulseWidth = 0x0006 +GetVersion = 0x0007 +GetSerialNo = 0x0009 +GetListStatus = 0x000A +GetPositionXY = 0x000C +GotoXY = 0x000D +LaserSignalOff = 0x000E +LaserSignalOn = 0x000F +WriteCorLine = 0x0010 +ResetList = 0x0012 +RestartList = 0x0013 +WriteCorTable = 0x0015 +SetControlMode = 0x0016 +SetDelayMode = 0x0017 +SetMaxPolyDelay = 0x0018 +SetEndOfList = 0x0019 +SetFirstPulseKiller = 0x001A +SetLaserMode = 0x001B +SetTiming = 0x001C +SetStandby = 0x001D +SetPwmHalfPeriod = 0x001E +StopExecute = 0x001F +StopList = 0x0020 +WritePort = 0x0021 +WriteAnalogPort1 = 0x0022 +WriteAnalogPort2 = 0x0023 +WriteAnalogPortX = 0x0024 +ReadPort = 0x0025 +SetAxisMotionParam = 0x0026 +SetAxisOriginParam = 0x0027 +AxisGoOrigin = 0x0028 +MoveAxisTo = 0x0029 +GetAxisPos = 0x002A +GetFlyWaitCount = 0x002B +GetMarkCount = 0x002D +SetFpkParam2 = 0x002E +Fiber_SetMo = 0x0033 +Fiber_GetStMO_AP = 0x0034 +EnableZ = 0x003A +DisableZ = 0x0039 +SetZData = 0x003B +SetSPISimmerCurrent = 0x003C +SetFpkParam = 0x0062 +Reset = 0x0040 +GetFlySpeed = 0x0038 +FiberPulseWidth = 0x002F +FiberGetConfigExtend = 0x0030 +InputPort = 0x0031 +GetMarkTime = 0x0041 +GetUserData = 0x0036 +SetFlyRes = 0x0032 + +# ============================================================================= +# Lookup tables +# ============================================================================= + +LIST_COMMAND_NAMES: Dict[int, str] = { + 0x8001: "listJumpTo", + 0x8002: "listEndOfList", + 0x8003: "listLaserOnPoint", + 0x8004: "listDelayTime", + 0x8005: "listMarkTo", + 0x8006: "listJumpSpeed", + 0x8007: "listLaserOnDelay", + 0x8008: "listLaserOffDelay", + 0x800A: "listMarkFreq", + 0x800B: "listMarkPowerRatio", + 0x800C: "listMarkSpeed", + 0x800D: "listJumpDelay", + 0x800F: "listPolygonDelay", + 0x8011: "listWritePort", + 0x8012: "listMarkCurrent", + 0x8013: "listMarkFreq2", + 0x801A: "listFlyEnable", + 0x801B: "listQSwitchPeriod", + 0x801C: "listDirectLaserSwitch", + 0x801D: "listFlyDelay", + 0x801E: "listSetCo2FPK", + 0x801F: "listFlyWaitInput", + 0x8021: "listFiberOpenMO", + 0x8022: "listWaitForInput", + 0x8023: "listChangeMarkCount", + 0x8024: "listSetWeldPowerWave", + 0x8025: "listEnableWeldPowerWave", + 0x8026: "listFiberYLPMPulseWidth", + 0x8028: "listFlyEncoderCount", + 0x8029: "listSetDaZWord", + 0x8050: "listJptSetParam", + 0x8051: "listReadyMark", +} + +SINGLE_COMMAND_NAMES: Dict[int, str] = { + 0x0002: "DisableLaser", + 0x0004: "EnableLaser", + 0x0005: "ExecuteList", + 0x0006: "SetPwmPulseWidth", + 0x0007: "GetVersion", + 0x0009: "GetSerialNo", + 0x000A: "GetListStatus", + 0x000C: "GetPositionXY", + 0x000D: "GotoXY", + 0x000E: "LaserSignalOff", + 0x000F: "LaserSignalOn", + 0x0010: "WriteCorLine", + 0x0012: "ResetList", + 0x0013: "RestartList", + 0x0015: "WriteCorTable", + 0x0016: "SetControlMode", + 0x0017: "SetDelayMode", + 0x0018: "SetMaxPolyDelay", + 0x0019: "SetEndOfList", + 0x001A: "SetFirstPulseKiller", + 0x001B: "SetLaserMode", + 0x001C: "SetTiming", + 0x001D: "SetStandby", + 0x001E: "SetPwmHalfPeriod", + 0x001F: "StopExecute", + 0x0020: "StopList", + 0x0021: "WritePort", + 0x0022: "WriteAnalogPort1", + 0x0023: "WriteAnalogPort2", + 0x0024: "WriteAnalogPortX", + 0x0025: "ReadPort", + 0x0026: "SetAxisMotionParam", + 0x0027: "SetAxisOriginParam", + 0x0028: "AxisGoOrigin", + 0x0029: "MoveAxisTo", + 0x002A: "GetAxisPos", + 0x002B: "GetFlyWaitCount", + 0x002D: "GetMarkCount", + 0x002E: "SetFpkParam2", + 0x0033: "Fiber_SetMo", + 0x0034: "Fiber_GetStMO_AP", + 0x003A: "EnableZ", + 0x0039: "DisableZ", + 0x003B: "SetZData", + 0x003C: "SetSPISimmerCurrent", + 0x0062: "SetFpkParam", + 0x0040: "Reset", + 0x0038: "GetFlySpeed", + 0x002F: "FiberPulseWidth", + 0x0030: "FiberGetConfigExtend", + 0x0031: "InputPort", + 0x0041: "GetMarkTime", + 0x0036: "GetUserData", + 0x0032: "SetFlyRes", +} + +# ============================================================================= +# Status flags (from GetListStatus / GetVersion word 3) +# ============================================================================= + +STATUS_BUSY = 0x04 +STATUS_READY = 0x20 +STATUS_AXIS = 0x40 + +# ============================================================================= +# Configuration defaults +# ============================================================================= + +LIST_BUFFER_SIZE = 0xC00 # 3072 bytes = 256 commands x 12 bytes +COMMAND_SIZE = 12 # 6 x uint16 little-endian + +NOP_COMMAND = bytes([0x02, 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + +# Galvo coordinate range +GALVO_CENTER = 0x8000 +GALVO_MIN = 0x0000 +GALVO_MAX = 0xFFFF + +# Laser sources +SOURCE_FIBER = "fiber" +SOURCE_CO2 = "co2" +SOURCE_UV = "uv" diff --git a/rayforge/machine/driver/galvo/galvo_controller.py b/rayforge/machine/driver/galvo/galvo_controller.py new file mode 100644 index 000000000..35e9460f4 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_controller.py @@ -0,0 +1,461 @@ +""" +EzCad2/LMC Galvo Controller. +""" + +import asyncio +import logging +import struct +from typing import Optional + +from .galvo_connection import GalvoConnection +from .galvo_consts import ( + LIST_BUFFER_SIZE, + COMMAND_SIZE, + NOP_COMMAND, + SOURCE_CO2, + SOURCE_FIBER, + STATUS_BUSY, + STATUS_READY, + listDelayTime, + listEndOfList, + listJumpDelay, + listJumpSpeed, + listJumpTo, + listLaserOffDelay, + listLaserOnDelay, + listLaserOnPoint, + listMarkCurrent, + listMarkFreq, + listMarkPowerRatio, + listMarkSpeed, + listMarkTo, + listPolygonDelay, + listQSwitchPeriod, + listReadyMark, + listSetCo2FPK, + listWritePort, + listFiberYLPMPulseWidth, + EnableLaser, + ExecuteList, + SetPwmPulseWidth, + GetVersion, + GotoXY, + ResetList, + SetControlMode, + SetDelayMode, + SetFirstPulseKiller, + SetLaserMode, + SetTiming, + SetStandby, + SetPwmHalfPeriod, + WritePort, + SetEndOfList, + EnableZ, + SetFpkParam2, + SetFlyRes, + Fiber_SetMo, + WriteAnalogPort1, +) +from .galvo_protocol import GalvoResponse + +logger = logging.getLogger(__name__) + +DRIVER_STATE_IDLE = 0 +DRIVER_STATE_MARKING = 1 +DRIVER_STATE_LIGHTING = 2 + + +class GalvoController: + """ + Manages an EzCad2/LMC galvo controller board. + """ + + def __init__( + self, + connection: Optional[GalvoConnection] = None, + source: str = SOURCE_FIBER, + galvos_per_mm: int = 500, + ): + self._connection: Optional[GalvoConnection] = connection + self._source = source + self._galvos_per_mm = galvos_per_mm + self._list_lock = asyncio.Lock() + + self._state = DRIVER_STATE_IDLE + self._active_list: Optional[bytearray] = None + self._active_index: int = 0 + self._num_list_packets: int = 0 + self._list_executing: bool = False + + self._last_x: int = 0x8000 + self._last_y: int = 0x8000 + + self._speed: Optional[float] = None + self._travel_speed: Optional[float] = None + self._frequency: Optional[float] = None + self._power: Optional[float] = None + self._fpk: Optional[float] = None + self._pulse_width: Optional[int] = None + + self._delay_on: Optional[float] = None + self._delay_off: Optional[float] = None + self._delay_poly: Optional[float] = None + + self._port_bits: int = 0 + self._laser_pin: int = 0 + + self.first_pulse_killer: int = 200 + self.pwm_pulse_width: int = 125 + self.pwm_half_period: int = 125 + self.standby_p1: int = 2000 + self.standby_p2: int = 20 + self.timing_mode: int = 1 + self.delay_mode: int = 1 + self.laser_mode: int = 1 + self.control_mode: int = 0 + self.fpk_max_voltage: int = 0xFFB + self.fpk_min_voltage: int = 1 + self.fpk_t1: int = 409 + self.fpk_t2: int = 100 + self.fly_resolution_1: int = 0 + self.fly_resolution_2: int = 99 + self.fly_resolution_3: int = 1000 + self.fly_resolution_4: int = 25 + + self._initialized: bool = False + + @property + def is_connected(self) -> bool: + return self._connection is not None and self._connection.is_connected + + @property + def source(self) -> str: + return self._source + + @source.setter + def source(self, value: str) -> None: + self._source = value + + @property + def state(self) -> int: + return self._state + + async def connect(self) -> None: + if self._connection is None: + from .galvo_mock_connection import MockGalvoConnection + + self._connection = MockGalvoConnection() + if not self._connection.is_connected: + self._connection.open() + self._initialize() + + async def disconnect(self) -> None: + if self._connection and self._connection.is_connected: + self._connection.close() + self._initialized = False + + def _initialize(self) -> None: + self._send_single(EnableLaser) + self._send_single(SetControlMode, self.control_mode) + self._send_single(SetLaserMode, self.laser_mode) + self._send_single(SetDelayMode, self.delay_mode) + self._send_single(SetTiming, self.timing_mode) + self._send_single(SetStandby, self.standby_p1, self.standby_p2) + self._send_single(SetFirstPulseKiller, self.first_pulse_killer) + self._send_single(SetPwmHalfPeriod, self.pwm_half_period) + self._send_single(SetPwmPulseWidth, self.pwm_pulse_width) + if self._source == SOURCE_FIBER: + self._send_single(Fiber_SetMo, 0) + self._send_single( + SetFpkParam2, + self.fpk_max_voltage, + self.fpk_min_voltage, + self.fpk_t1, + self.fpk_t2, + ) + self._send_single( + SetFlyRes, + self.fly_resolution_1, + self.fly_resolution_2, + self.fly_resolution_3, + self.fly_resolution_4, + ) + self._send_single(EnableZ) + self._send_single(WriteAnalogPort1, 0x7FF) + self._initialized = True + + async def enter_marking_mode(self) -> None: + if self._state == DRIVER_STATE_MARKING: + return + self._state = DRIVER_STATE_MARKING + self._reset_list() + self._port_on(self._laser_pin) + self._list_write_port() + if self._source == SOURCE_FIBER: + self._send_single(Fiber_SetMo, 1) + self._reset_cached_params() + self._list_write(listReadyMark) + + async def enter_idle_mode(self) -> None: + if self._state == DRIVER_STATE_IDLE: + return + self._end_list() + if not self._list_executing and self._num_list_packets: + self._execute_list() + self._list_executing = False + self._num_list_packets = 0 + await self._wait_idle() + if self._source == SOURCE_FIBER: + self._send_single(Fiber_SetMo, 0) + self._port_off(self._laser_pin) + self._send_single(WritePort, self._port_bits) + self._state = DRIVER_STATE_IDLE + + def _reset_cached_params(self) -> None: + self._speed = None + self._travel_speed = None + self._frequency = None + self._power = None + self._fpk = None + self._pulse_width = None + self._delay_on = None + self._delay_off = None + self._delay_poly = None + + def _reset_list(self) -> None: + self._active_list = None + self._active_index = 0 + self._send_single(ResetList) + + def _end_list(self) -> None: + if self._active_list is None or self._active_index == 0: + return + if self._connection is None: + return + self._list_write(listEndOfList) + packet = bytes(self._active_list[: self._active_index]) + padded = bytearray(packet) + remaining = LIST_BUFFER_SIZE - len(padded) + padded.extend(NOP_COMMAND * (remaining // COMMAND_SIZE)) + self._connection.write(packet=bytes(padded)) + self._send_single(SetEndOfList, 0) + self._num_list_packets += 1 + self._active_list = None + self._active_index = 0 + if self._num_list_packets > 2 and not self._list_executing: + self._execute_list() + self._list_executing = True + + def _list_write( + self, + command: int, + v1: int = 0, + v2: int = 0, + v3: int = 0, + v4: int = 0, + v5: int = 0, + ) -> None: + if self._active_index >= LIST_BUFFER_SIZE: + return + if self._active_list is None: + self._active_list = bytearray(LIST_BUFFER_SIZE) + self._active_index = 0 + struct.pack_into( + "<6H", + self._active_list, + self._active_index, + command, + v1, + v2, + v3, + v4, + v5, + ) + self._active_index += COMMAND_SIZE + + def _send_single( + self, + command: int, + v1: int = 0, + v2: int = 0, + v3: int = 0, + v4: int = 0, + v5: int = 0, + ) -> "GalvoResponse": + if self._connection is None: + return GalvoResponse(0, 0, 0, 0) + cmd = struct.pack("<6H", command, v1, v2, v3, v4, v5) + self._connection.write(packet=cmd) + resp = self._connection.read() + return GalvoResponse.from_bytes(resp) + + def _execute_list(self) -> None: + self._send_single(ExecuteList) + + def _list_write_port(self) -> None: + self._list_write(listWritePort, self._port_bits) + + def _port_on(self, bit: int) -> None: + self._port_bits |= 1 << bit + + def _port_off(self, bit: int) -> None: + self._port_bits &= ~(1 << bit) + + def _get_status(self) -> int: + resp = self._send_single(GetVersion) + return resp.v3 + + def _is_busy(self) -> bool: + return bool(self._get_status() & STATUS_BUSY) + + def _is_ready(self) -> bool: + return bool(self._get_status() & STATUS_READY) + + async def _wait_idle(self) -> None: + while self._is_busy(): + await asyncio.sleep(0.01) + + async def _wait_ready(self) -> None: + while not self._is_ready(): + await asyncio.sleep(0.01) + + def goto(self, x: int, y: int) -> None: + if x == self._last_x and y == self._last_y: + return + x = max(0, min(0xFFFF, x)) + y = max(0, min(0xFFFF, y)) + distance = int( + abs(complex(x, y) - complex(self._last_x, self._last_y)) + ) + if distance > 0xFFFF: + distance = 0xFFFF + self._list_write(listJumpTo, x, y, 0, distance) + self._last_x = x + self._last_y = y + + def mark(self, x: int, y: int) -> None: + if x == self._last_x and y == self._last_y: + return + x = max(0, min(0xFFFF, x)) + y = max(0, min(0xFFFF, y)) + distance = int( + abs(complex(x, y) - complex(self._last_x, self._last_y)) + ) + if distance > 0xFFFF: + distance = 0xFFFF + self._list_write(listMarkTo, x, y, 0, distance) + self._last_x = x + self._last_y = y + + def goto_xy( + self, x: int, y: int, angle: int = 0, distance: int = 0 + ) -> None: + self._last_x = x + self._last_y = y + self._send_single(GotoXY, x, y, angle, distance) + + def dwell(self, time_ms: int) -> None: + dwell_time = time_ms * 100 + while dwell_time > 0: + d = min(dwell_time, 60000) + self._list_write(listLaserOnPoint, d) + dwell_time -= d + + def wait(self, time_ms: int) -> None: + wait_time = time_ms * 100 + while wait_time > 0: + d = min(wait_time, 60000) + self._list_write(listDelayTime, d) + wait_time -= d + + def set_mark_speed(self, speed: float) -> None: + speed_val = int(self._convert_speed(speed)) + if speed_val > 0xFFFF: + speed_val = 0xFFFF + self._list_write(listMarkSpeed, speed_val) + + def set_travel_speed(self, speed: float) -> None: + if self._travel_speed == speed: + return + self._travel_speed = speed + speed_val = int(self._convert_speed(speed)) + if speed_val > 0xFFFF: + speed_val = 0xFFFF + self._list_write(listJumpSpeed, speed_val) + + def set_laser_on_delay(self, delay: float) -> None: + if self._delay_on == delay: + return + self._delay_on = delay + sign = 0x0000 if delay >= 0 else 0x8000 + self._list_write(listLaserOnDelay, abs(int(delay)), sign) + + def set_laser_off_delay(self, delay: float) -> None: + if self._delay_off == delay: + return + self._delay_off = delay + sign = 0x0000 if delay >= 0 else 0x8000 + self._list_write(listLaserOffDelay, abs(int(delay)), sign) + + def set_jump_delay(self, delay: float) -> None: + if self._delay_on == delay: + return + self._delay_on = delay + sign = 0x0000 if delay >= 0 else 0x8000 + self._list_write(listJumpDelay, abs(int(delay)), sign) + + def set_polygon_delay(self, delay: float) -> None: + if self._delay_poly == delay: + return + self._delay_poly = delay + val = int(min(65535, max(0, delay / 10.0))) + self._list_write(listPolygonDelay, val) + + def set_power(self, power_percent: float) -> None: + if self._power == power_percent: + return + self._power = power_percent + if self._source == SOURCE_FIBER: + current = self._convert_power(power_percent) + self._list_write(listMarkCurrent, current) + elif self._source == SOURCE_CO2: + freq = self._frequency or 1.0 + ratio = int(round(200 * power_percent / freq)) + self._list_write(listMarkPowerRatio, ratio) + + def set_frequency(self, freq_khz: float) -> None: + if self._frequency == freq_khz: + return + self._frequency = freq_khz + if self._source == SOURCE_FIBER: + period = self._convert_frequency(freq_khz, base=20000.0) + self._list_write(listQSwitchPeriod, period) + elif self._source == SOURCE_CO2: + period = self._convert_frequency(freq_khz, base=10000.0) + self._list_write(listMarkFreq, period) + + def set_fpk(self, fpk: float) -> None: + if self._source not in (SOURCE_CO2,): + return + if self._fpk == fpk: + return + self._fpk = fpk + val = int(round(2000.0 / (self._frequency or 1.0))) + self._list_write(listSetCo2FPK, val, val) + + def set_pulse_width(self, pulse_width: int) -> None: + if self._pulse_width == pulse_width: + return + self._pulse_width = pulse_width + self._list_write(listFiberYLPMPulseWidth, pulse_width) + + def _convert_speed(self, speed_mm_s: float) -> int: + return int(speed_mm_s * self._galvos_per_mm / 1000.0) + + def _convert_frequency( + self, freq_khz: float, base: float = 20000.0 + ) -> int: + return int(round(base / freq_khz)) & 0xFFFF + + def _convert_power(self, power_percent: float) -> int: + return int(round(power_percent * 0xFFF / 100.0)) diff --git a/rayforge/machine/driver/galvo/galvo_driver.py b/rayforge/machine/driver/galvo/galvo_driver.py new file mode 100644 index 000000000..5d049c1f2 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_driver.py @@ -0,0 +1,402 @@ +""" +Galvo Driver - High-level driver for EzCad2/LMC galvo laser controllers. + +Implements the Driver interface with coordinate conversion +(mm -> galvo native) and uses GalvoController for low-level +communication. +""" + +import asyncio +import inspect +import logging +from gettext import gettext as _ +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Union, +) + +from raygeo.ops.axis import Axis + +from ....context import RayforgeContext +from ....core.capability import PWMCapability +from ....core.varset import ChoiceVar, VarSet, BoolVar +from ....pipeline.encoder.base import EncodedOutput, OpsEncoder +from ...models.laser import LaserType +from ...transport import TransportStatus +from ..driver import ( + DeviceStatus, + Driver, + DriverMaturity, + DriverSetupError, + Pos, +) +from .galvo_controller import GalvoController +from .galvo_encoder import GalvoEncoder +from .galvo_mock_connection import MockGalvoConnection + +if TYPE_CHECKING: + from raygeo.ops import Ops + + from ....core.doc import Doc + from ...models.laser import Laser + from ...models.machine import Machine + +logger = logging.getLogger(__name__) + + +class GalvoDriver(Driver): + """ + Driver for EzCad2/LMC galvo laser controllers (USB). + + Supports galvo-based laser systems like AtomStack M4. + Coordinates are converted from mm to native galvo values + (0-65535, center at 0x8000). + """ + + label = _("Galvo (EzCad2)") + subtitle = _("Connect to a Galvo/EzCad2 laser controller over USB") + supports_settings = False + reports_granular_progress = False + uses_gcode = False + maturity = DriverMaturity.EXPERIMENTAL + + CONNECTION_TIMEOUT = 5.0 + RECONNECT_INTERVAL = 3.0 + POSITION_POLL_INTERVAL = 1.0 + + def __init__(self, context: RayforgeContext, machine: "Machine"): + super().__init__(context, machine) + self._controller: Optional[GalvoController] = None + self._connection_task: Optional[asyncio.Task] = None + self._keep_running = False + self._is_connected = False + self._source: str = "fiber" + self._mock: bool = False + + @property + def machine_space_wcs(self) -> str: + return "MACHINE" + + @property + def machine_space_wcs_display_name(self) -> str: + return _("Machine Coordinates") + + @property + def supported_wcs(self) -> List[str]: + return [self.machine_space_wcs] + + @property + def resource_uri(self) -> Optional[str]: + return "usb://galvo/ezcad2" + + @classmethod + def precheck(cls, **kwargs: Any) -> None: + pass + + @classmethod + def get_setup_vars(cls) -> "VarSet": + return VarSet( + vars=[ + ChoiceVar( + key="source", + label=_("Laser Source"), + description=_("Type of laser source"), + choices=["fiber", "co2", "uv"], + default="fiber", + ), + BoolVar( + key="mock", + label=_("Mock Connection"), + description=_( + "Use mock connection for " + "testing (no hardware required)" + ), + default=False, + ), + ] + ) + + @classmethod + def create_encoder(cls, machine: "Machine") -> "OpsEncoder": + return GalvoEncoder() + + def get_laser_capabilities(self, laser: "Laser"): + if laser.laser_type != LaserType.DIODE: + return ( + PWMCapability( + frequency=laser.pwm_frequency, + max_frequency=laser.max_pwm_frequency, + pulse_width=laser.pulse_width, + min_pulse_width=laser.min_pulse_width, + max_pulse_width=laser.max_pulse_width, + ), + ) + return () + + def _setup_implementation(self, **kwargs: Any) -> None: + self._source = kwargs.get("source", "fiber") + self._mock = kwargs.get("mock", False) + + if self._mock: + connection = MockGalvoConnection() + else: + from .galvo_usb_connection import GalvoUSBConnection + + connection = GalvoUSBConnection() + + self._controller = GalvoController( + connection=connection, + source=self._source, + ) + self._init_coordinate_systems() + + def _init_coordinate_systems(self) -> None: + m = self._machine + existing = m.coordinate_systems + supported = self.supported_wcs + new_systems = {} + for name in supported: + if name in existing: + new_systems[name] = existing[name] + else: + from ...models.coordinate_system import CoordinateSystem + + new_systems[name] = CoordinateSystem(name=name) + m.coordinate_systems = new_systems + if m.active_wcs not in new_systems: + m.active_wcs = supported[0] + + async def cleanup(self): + self._keep_running = False + self._is_connected = False + + if self._connection_task: + self._connection_task.cancel() + try: + await self._connection_task + except asyncio.CancelledError: + pass + self._connection_task = None + + if self._controller: + await self._controller.disconnect() + self._controller = None + + self._update_connection_status(TransportStatus.DISCONNECTED, "") + await super().cleanup() + + async def _connect_implementation(self) -> None: + if self._connection_task and not self._connection_task.done(): + logger.warning("Connect called with active connection task") + return + + self._keep_running = True + self._connection_task = asyncio.create_task(self._connection_loop()) + + async def _connection_loop(self) -> None: + logger.debug("Entering Galvo connection loop") + while self._keep_running: + self._update_connection_status(TransportStatus.CONNECTING) + self._is_connected = False + + try: + if not self._controller: + raise DriverSetupError("Controller not initialized") + + await self._controller.connect() + self._is_connected = True + self._update_connection_status(TransportStatus.CONNECTED, "") + self.state.status = DeviceStatus.IDLE + self.state_changed.send(self, state=self.state) + + logger.info( + "Connected to Galvo controller", + extra=self._log_extra("MACHINE_EVENT"), + ) + + while self._keep_running and self._is_connected: + await asyncio.sleep(self.POSITION_POLL_INTERVAL) + + except asyncio.CancelledError: + logger.debug("Connection loop cancelled") + break + except Exception as e: + logger.error(f"Connection error: {e}") + self._update_connection_status(TransportStatus.ERROR, str(e)) + if self._controller: + await self._controller.disconnect() + + if self._keep_running: + self._update_connection_status(TransportStatus.SLEEPING) + await asyncio.sleep(self.RECONNECT_INTERVAL) + + logger.debug("Exiting Galvo connection loop") + + async def run( + self, + encoded: EncodedOutput, + doc: "Doc", + ops: "Ops", + on_command_done: Optional[ + Callable[[int], Union[None, Awaitable[None]]] + ] = None, + ) -> None: + binary_data = encoded.driver_data.get("binary", b"") + text_lines = [ + line.strip() for line in encoded.text.splitlines() if line.strip() + ] + op_map = encoded.op_map + + if on_command_done is not None: + num_ops = 0 + if op_map and op_map.op_to_machine_code: + num_ops = max(op_map.op_to_machine_code.keys()) + 1 + for op_index in range(num_ops): + result = on_command_done(op_index) + if inspect.isawaitable(result): + await result + + logger.info( + f"Executing {len(text_lines)} commands", + extra=self._log_extra("USER_COMMAND"), + ) + for line in text_lines: + logger.info(line, extra=self._log_extra("USER_COMMAND")) + + if binary_data and self._controller: + self._controller._reset_list() + self._controller._list_write(0x8051) + chunk_size = 12 + for i in range(0, len(binary_data), chunk_size): + chunk = binary_data[i : i + chunk_size] + cmd = int.from_bytes(chunk[0:2], "little") + v1 = int.from_bytes(chunk[2:4], "little") + v2 = int.from_bytes(chunk[4:6], "little") + v3 = int.from_bytes(chunk[6:8], "little") + v4 = int.from_bytes(chunk[8:10], "little") + v5 = int.from_bytes(chunk[10:12], "little") + self._controller._list_write(cmd, v1, v2, v3, v4, v5) + + self.job_finished.send(self) + + async def run_raw(self, machine_code: str) -> None: + if machine_code and machine_code.strip(): + logger.warning( + "Galvo controllers do not support " + "text-based machine code. " + "Use run() with EncodedOutput instead." + ) + self.job_finished.send(self) + + async def set_hold(self, hold: bool = True) -> None: + assert self._controller + if hold: + self._controller._send_single(0x0020) + else: + self._controller._send_single(0x0013) + + async def cancel(self) -> None: + assert self._controller + self._controller._send_single(0x001F) + + def can_home(self, axis: Optional[Axis] = None) -> bool: + return False + + async def home(self, axes: Optional[Axis] = None) -> None: + logger.info("Home not supported on galvo controllers") + + async def move_to(self, pos_x: float, pos_y: float) -> None: + assert self._controller + logger.info( + f"move_to x={pos_x:.2f} y={pos_y:.2f}", + extra=self._log_extra("MACHINE_EVENT"), + ) + x = int((pos_x / 200.0) * 32768.0 + 32768.0) + y = int((pos_y / 200.0) * 32768.0 + 32768.0) + x = max(0, min(0xFFFF, x)) + y = max(0, min(0xFFFF, y)) + self._controller.goto_xy(x, y) + + async def select_tool(self, tool_number: int) -> None: + pass + + async def read_settings(self) -> None: + await asyncio.sleep(0) + self.settings_read.send(self, settings=[]) + + def get_setting_vars(self) -> List["VarSet"]: + return [VarSet(title=_("No settings"))] + + async def write_setting(self, key: str, value: Any) -> None: + pass + + async def clear_alarm(self) -> None: + assert self._controller + self._controller._send_single(0x001F) + + async def set_power(self, head: "Laser", percent: float) -> None: + assert self._controller + power_pct = percent * 100.0 + self._controller.set_power(power_pct) + self._controller.set_frequency(30.0) + + async def set_focus_power(self, head: "Laser", percent: float) -> None: + await self.set_power(head, percent) + + def can_jog(self, axis: Optional[Axis] = None) -> bool: + return True + + async def jog(self, speed: int, **deltas: float) -> None: + assert self._controller + for axis_name, delta in deltas.items(): + axis_lower = axis_name.lower() + delta_galvo = int((delta / 200.0) * 32768.0) + if axis_lower == "x": + new_x = self._controller._last_x + delta_galvo + new_x = max(0, min(0xFFFF, new_x)) + self._controller.goto_xy(new_x, self._controller._last_y) + elif axis_lower == "y": + new_y = self._controller._last_y + delta_galvo + new_y = max(0, min(0xFFFF, new_y)) + self._controller.goto_xy(self._controller._last_x, new_y) + + async def set_wcs_offset( + self, wcs_slot: str, x: float, y: float, z: float + ) -> None: + pass + + async def read_wcs_offsets(self) -> Dict[str, Pos]: + offsets: Dict[str, Pos] = {"MACHINE": (0.0, 0.0, 0.0)} + self.wcs_updated.send(self, offsets=offsets) + return offsets + + async def read_parser_state(self) -> Optional[str]: + return None + + async def select_wcs(self, wcs: str) -> None: + pass + + async def run_probe_cycle( + self, axis: Axis, max_travel: float, feed_rate: int + ) -> Optional[Pos]: + self.probe_status_changed.send( + self, message="Probe not supported on galvo" + ) + return None + + @property + def is_connected(self) -> bool: + return self._is_connected + + def _update_connection_status( + self, status: TransportStatus, message: str = "" + ) -> None: + self.connection_status_changed.send( + self, status=status, message=message + ) diff --git a/rayforge/machine/driver/galvo/galvo_encoder.py b/rayforge/machine/driver/galvo/galvo_encoder.py new file mode 100644 index 000000000..bf2a27e67 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_encoder.py @@ -0,0 +1,252 @@ +""" +Galvo Encoder - Converts Ops commands to EzCad2 binary protocol. + +Produces both binary output (list buffer commands) for the controller +and human-readable text representation for UI display. +""" + +import logging +import struct +from typing import TYPE_CHECKING, List, Optional + +from raygeo.geo.types import Point3D +from raygeo.ops import Ops +from raygeo.ops.types import CommandType + +from ....pipeline.encoder.base import ( + EncodedOutput, + MachineCodeOpMap, + OpsEncoder, +) +from .galvo_consts import GALVO_CENTER + +if TYPE_CHECKING: + from ....core.doc import Doc + from ....machine.models.machine import Machine + +logger = logging.getLogger(__name__) + + +class GalvoEncoder(OpsEncoder): + """ + Converts Ops commands to EzCad2 binary protocol for galvo controllers. + """ + + def __init__( + self, + source: str = "fiber", + galvos_per_mm: int = 500, + default_mark_speed: float = 100.0, + default_jump_speed: float = 2000.0, + default_frequency: float = 30.0, + default_power: float = 50.0, + ): + self._source = source + self._galvos_per_mm = galvos_per_mm + self._default_mark_speed = default_mark_speed + self._default_jump_speed = default_jump_speed + self._default_frequency = default_frequency + self._default_power = default_power + self._reset_state() + + def _reset_state(self) -> None: + self._binary_chunks: List[bytes] = [] + self._text_lines: List[str] = [] + self._current_pos: Point3D = (0.0, 0.0, 0.0) + self._last_x: int = GALVO_CENTER + self._last_y: int = GALVO_CENTER + self._power: Optional[float] = None + self._cut_speed: Optional[float] = None + self._travel_speed: Optional[float] = None + self._frequency: Optional[float] = None + self._in_marking_mode: bool = False + + def encode( + self, ops: Ops, machine: "Machine", doc: "Doc" + ) -> EncodedOutput: + self._reset_state() + op_map = MachineCodeOpMap() + + for i in range(ops.len()): + start_line = len(self._text_lines) + self._handle_command(ops, i, machine) + end_line = len(self._text_lines) + + if end_line > start_line: + line_indices = list(range(start_line, end_line)) + op_map.op_to_machine_code[i] = line_indices + for line_num in line_indices: + op_map.machine_code_to_op[line_num] = i + else: + op_map.op_to_machine_code[i] = [] + + binary_data = b"".join(self._binary_chunks) + + if self._text_lines and not self._text_lines[-1]: + self._text_lines = self._text_lines[:-1] + + return EncodedOutput( + text="\n".join(self._text_lines), + op_map=op_map, + driver_data={"binary": binary_data}, + ) + + def _mm_to_galvo(self, mm: float, field_size_mm: float = 200.0) -> int: + centered = (mm / field_size_mm) * 32768.0 + 32768.0 + return int(max(0, min(0xFFFF, centered))) + + def _power_to_int(self, power_normalized: float) -> int: + return int(round(power_normalized * 100.0)) + + def _speed_to_int(self, speed_mm_s: float) -> int: + return int(speed_mm_s * self._galvos_per_mm / 1000.0) + + def _frequency_to_period( + self, freq_khz: float, base: float = 20000.0 + ) -> int: + return int(round(base / freq_khz)) & 0xFFFF + + def _add_binary( + self, + cmd: int, + v1: int = 0, + v2: int = 0, + v3: int = 0, + v4: int = 0, + v5: int = 0, + ) -> None: + self._binary_chunks.append(struct.pack("<6H", cmd, v1, v2, v3, v4, v5)) + + def _handle_command(self, ops: Ops, idx: int, machine: "Machine") -> None: + ct = ops.command_type(idx) + + if ct == CommandType.SET_POWER: + power = ops.power(idx) + self._power = power + pct = power * 100.0 + self._text_lines.append(f"POWER {pct:.1f}%") + if self._source == "fiber": + current = int(round(pct * 0xFFF / 100.0)) + self._add_binary(0x8012, current) + elif self._source in ("co2", "uv"): + freq = self._frequency or self._default_frequency + ratio = int(round(200 * pct / freq)) + self._add_binary(0x800B, ratio) + + elif ct == CommandType.SET_CUT_SPEED: + speed = ops.speed(idx) + self._cut_speed = speed + speed_val = self._speed_to_int(speed) + if speed_val > 0xFFFF: + speed_val = 0xFFFF + self._add_binary(0x800C, speed_val) + self._text_lines.append(f"MARK_SPEED {speed:.1f}") + + elif ct == CommandType.SET_TRAVEL_SPEED: + speed = ops.speed(idx) + self._travel_speed = speed + speed_val = self._speed_to_int(speed) + if speed_val > 0xFFFF: + speed_val = 0xFFFF + self._add_binary(0x8006, speed_val) + self._text_lines.append(f"JUMP_SPEED {speed:.1f}") + + elif ct == CommandType.SET_FREQUENCY: + freq = ops.frequency(idx) + self._frequency = float(freq) + if self._source == "fiber": + period = self._frequency_to_period(float(freq), base=20000.0) + self._add_binary(0x801B, period) + elif self._source in ("co2", "uv"): + period = self._frequency_to_period(float(freq), base=10000.0) + self._add_binary(0x800A, period) + self._text_lines.append(f"FREQUENCY {freq}") + + elif ct == CommandType.SET_PULSE_WIDTH: + pw = ops.pulse_width(idx) + self._add_binary(0x8026, int(pw)) + self._text_lines.append(f"PULSE_WIDTH {pw:.1f}") + + elif ct == CommandType.MOVE_TO: + end = ops.endpoint(idx) + x = self._mm_to_galvo(end[0]) + y = self._mm_to_galvo(end[1]) + distance = int( + abs(complex(x, y) - complex(self._last_x, self._last_y)) + ) + if distance > 0xFFFF: + distance = 0xFFFF + self._add_binary(0x8001, x, y, 0, distance) + self._text_lines.append(f"JUMP X:{end[0]:.3f} Y:{end[1]:.3f}") + self._last_x = x + self._last_y = y + self._current_pos = end + + elif ct == CommandType.LINE_TO: + end = ops.endpoint(idx) + x = self._mm_to_galvo(end[0]) + y = self._mm_to_galvo(end[1]) + distance = int( + abs(complex(x, y) - complex(self._last_x, self._last_y)) + ) + if distance > 0xFFFF: + distance = 0xFFFF + self._add_binary(0x8005, x, y, 0, distance) + self._text_lines.append(f"MARK X:{end[0]:.3f} Y:{end[1]:.3f}") + self._last_x = x + self._last_y = y + self._current_pos = end + + elif ct == CommandType.ARC_TO: + end = ops.endpoint(idx) + i_val, j_val, cw = ops.arc_params(idx) + self._text_lines.append( + f"; ARC ({end[0]:.3f}, {end[1]:.3f}) {'CW' if cw else 'CCW'}" + ) + sub_ops = ops.linearize(idx, self._current_pos) + for j in range(sub_ops.len()): + self._handle_command(sub_ops, j, machine) + + elif ct == CommandType.SCAN_LINE: + end = ops.endpoint(idx) + self._text_lines.append( + f"; SCAN_LINE to ({end[0]:.3f}, {end[1]:.3f})" + ) + sub_ops = ops.linearize(idx, self._current_pos) + for j in range(sub_ops.len()): + self._handle_command(sub_ops, j, machine) + + elif ct == CommandType.JOB_START: + self._add_binary(0x8051) + self._text_lines.append("; JOB START (READY)") + active_wcs = machine.active_wcs + self._text_lines.append(f"; WCS: {active_wcs}") + + elif ct == CommandType.JOB_END: + self._add_binary(0x8002) + self._text_lines.append("; JOB END") + + elif ct == CommandType.LAYER_START: + uid = ops.layer_uid(idx) + self._text_lines.append(f"; --- Layer {uid[:8]} ---") + + elif ct == CommandType.LAYER_END: + self._text_lines.append("; --- End Layer ---") + + elif ct == CommandType.WORKPIECE_START: + uid = ops.workpiece_uid(idx) + self._text_lines.append(f"; --- Workpiece {uid[:8]} ---") + + elif ct == CommandType.WORKPIECE_END: + self._text_lines.append("; --- End Workpiece ---") + + elif ct == CommandType.ENABLE_AIR_ASSIST: + self._add_binary(0x8011, 1) + self._text_lines.append("AIR_ASSIST ON") + + elif ct == CommandType.DISABLE_AIR_ASSIST: + self._add_binary(0x8011, 0) + self._text_lines.append("AIR_ASSIST OFF") + + elif ct == CommandType.SET_LASER: + self._text_lines.append("; LASER SELECT") diff --git a/rayforge/machine/driver/galvo/galvo_mock_connection.py b/rayforge/machine/driver/galvo/galvo_mock_connection.py new file mode 100644 index 000000000..c0166a039 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_mock_connection.py @@ -0,0 +1,143 @@ +""" +Mock connection for Galvo/EzCad2 controllers. + +Provides a fake USB connection for testing purposes. +Records sent commands and returns configurable responses. +""" + +import random +import struct +from typing import Callable, Optional + +from .galvo_consts import ( + COMMAND_SIZE, + LIST_BUFFER_SIZE, + GetSerialNo, + GetVersion, + ReadPort, + SINGLE_COMMAND_NAMES, + LIST_COMMAND_NAMES, +) +from .galvo_protocol import GalvoCommand + + +class MockGalvoConnection: + """ + A mock USB connection for EzCad2/LMC controllers. + + Records all sent commands for later verification and returns + configurable responses. Useful for testing without hardware. + """ + + def __init__( + self, + on_send: Optional[Callable[[str], None]] = None, + on_recv: Optional[Callable[[str], None]] = None, + ): + self.on_send = on_send + self.on_recv = on_recv + self._is_open = False + self.sent_singles: list[GalvoCommand] = [] + self.sent_lists: list[bytes] = [] + self._implied_response: Optional[bytes] = None + self._serial_number: str = "MOCK-GALVO-001" + self._version: int = 0x0020 + self._list_executed: bool = False + self._position_x: int = 0x8000 + self._position_y: int = 0x8000 + + @property + def is_connected(self) -> bool: + return self._is_open + + def open(self, index: int = 0) -> int: + self._is_open = True + self.sent_singles.clear() + self.sent_lists.clear() + return index + + def close(self, index: int = 0) -> None: + self._is_open = False + + def write(self, index: int = 0, packet: Optional[bytes] = None) -> None: + if packet is None: + return + length = len(packet) + if length == COMMAND_SIZE: + self._process_single(packet) + elif length == LIST_BUFFER_SIZE: + self._process_list(packet) + + def read(self, index: int = 0) -> bytes: + if self._implied_response is not None: + resp = self._implied_response + self._implied_response = None + return resp + read = bytearray(8) + for r in range(len(read)): + read[r] = random.randint(0, 255) + return struct.pack("8B", *read) + + def set_implied_response(self, data: Optional[bytes]) -> None: + self._implied_response = data + + def _process_single(self, packet: bytes) -> None: + cmd = GalvoCommand.from_bytes(packet) + self.sent_singles.append(cmd) + if self.on_send: + name = SINGLE_COMMAND_NAMES.get(cmd.cmd, f"0x{cmd.cmd:04X}") + self.on_send( + f"{cmd.cmd:04X}:{cmd.v1:04X}:{cmd.v2:04X}:" + f"{cmd.v3:04X}:{cmd.v4:04X}:{cmd.v5:04X} {name}" + ) + + if cmd.cmd == GetSerialNo: + self._implied_response = self._serial_number.encode("ascii")[:8] + elif cmd.cmd == GetVersion: + self._implied_response = struct.pack("<4H", 0, 0, 0, self._version) + elif cmd.cmd == ReadPort: + pinvalue = 0 + self._implied_response = ( + b"\x00\x00" + struct.pack(" None: + self.sent_lists.append(packet) + self._list_executed = True + + if self.on_send: + commands = [] + last_cmd = None + repeats = 0 + for i in range(0, LIST_BUFFER_SIZE, COMMAND_SIZE): + chunk = packet[i : i + COMMAND_SIZE] + cmd = GalvoCommand.from_bytes(chunk) + name = LIST_COMMAND_NAMES.get(cmd.cmd, f"0x{cmd.cmd:04X}") + line = ( + f"{cmd.cmd:04X}:{cmd.v1:04X}:{cmd.v2:04X}:" + f"{cmd.v3:04X}:{cmd.v4:04X}:{cmd.v5:04X}" + f" {name}" + ) + if line == last_cmd: + repeats += 1 + continue + if repeats: + commands.append(f"... repeated {repeats} times ...") + repeats = 0 + commands.append(line) + last_cmd = line + if repeats: + commands.append(f"... repeated {repeats} times ...") + self.on_send("\n".join(commands)) + + @property + def was_list_executed(self) -> bool: + return self._list_executed + + def reset(self) -> None: + self.sent_singles.clear() + self.sent_lists.clear() + self._list_executed = False + self._implied_response = None diff --git a/rayforge/machine/driver/galvo/galvo_protocol.py b/rayforge/machine/driver/galvo/galvo_protocol.py new file mode 100644 index 000000000..cf5767cc7 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_protocol.py @@ -0,0 +1,89 @@ +""" +EzCad2/LMC protocol data structures and helpers. +""" + +import struct +from dataclasses import dataclass +from typing import Tuple + + +@dataclass +class GalvoResponse: + """Response from a single command (4 x uint16).""" + + v0: int + v1: int + v2: int + v3: int + + @classmethod + def from_bytes(cls, data: bytes) -> "GalvoResponse": + if len(data) != 8: + return cls(0, 0, 0, 0) + words = struct.unpack("<4H", data[:8]) + return cls(words[0], words[1], words[2], words[3]) + + +@dataclass +class GalvoCommand: + """ + A raw 12-byte command (6 x uint16 LE). + """ + + cmd: int + v1: int = 0 + v2: int = 0 + v3: int = 0 + v4: int = 0 + v5: int = 0 + + def to_bytes(self) -> bytes: + return struct.pack( + "<6H", self.cmd, self.v1, self.v2, self.v3, self.v4, self.v5 + ) + + @classmethod + def from_bytes(cls, data: bytes) -> "GalvoCommand": + words = struct.unpack("<6H", data[:12]) + return cls(words[0], words[1], words[2], words[3], words[4], words[5]) + + @property + def is_list_command(self) -> bool: + return self.cmd >= 0x8000 + + @property + def is_single_command(self) -> bool: + return self.cmd < 0x8000 + + @property + def name(self) -> str: + from .galvo_consts import LIST_COMMAND_NAMES, SINGLE_COMMAND_NAMES + + if self.is_list_command: + return LIST_COMMAND_NAMES.get(self.cmd, f"0x{self.cmd:04X}") + return SINGLE_COMMAND_NAMES.get(self.cmd, f"0x{self.cmd:04X}") + + +def build_nop() -> bytes: + """Build a NOP command (filler for empty list slots).""" + return struct.pack("<6H", 0x8002, 0, 0, 0, 0, 0) + + +def build_list_packet(commands: bytes) -> bytes: + """Build a 3072-byte list packet from concatenated command bytes.""" + if len(commands) > 0xC00: + commands = commands[:0xC00] + nop = build_nop() + result = bytearray(commands) + result.extend(nop * ((0xC00 - len(commands)) // 12)) + return bytes(result) + + +def parse_response(data: bytes) -> GalvoResponse: + """Parse an 8-byte response into a GalvoResponse.""" + return GalvoResponse.from_bytes(data) + + +def _bytes_to_words(data: bytes) -> Tuple[int, int, int, int]: + """Convert 8 bytes to 4 uint16 LE words.""" + return struct.unpack("<4H", data[:8]) diff --git a/rayforge/machine/driver/galvo/galvo_usb_connection.py b/rayforge/machine/driver/galvo/galvo_usb_connection.py new file mode 100644 index 000000000..ef29611a5 --- /dev/null +++ b/rayforge/machine/driver/galvo/galvo_usb_connection.py @@ -0,0 +1,130 @@ +""" +Real USB connection for EzCad2/LMC Galvo controllers. + +Uses pyusb/libusb. Communicates with VID 0x9588 / PID 0x9899 (JCZ/LMC) +or VID 0x1A86 / PID 0x5512 (CH341 clone boards). +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +import usb.core +import usb.util + +from .galvo_consts import ( + USB_PID_CH341, + USB_PID_JCZ, + USB_VID_CH341, + USB_VID_JCZ, + USB_READ_ENDPOINT, + USB_WRITE_ENDPOINT, +) + +logger = logging.getLogger(__name__) + + +class GalvoUSBConnection: + """ + USB connection for EzCad2/LMC galvo controller boards. + """ + + def __init__(self) -> None: + self._devices: Dict[int, Any] = {} + self._interfaces: Dict[int, Any] = {} + self._timeout = 100 + + def find_device(self, index: int = 0) -> Any: + found = usb.core.find( + idVendor=USB_VID_JCZ, + idProduct=USB_PID_JCZ, + find_all=True, + ) + devices = list(found) if found else [] + if not devices: + found = usb.core.find( + idVendor=USB_VID_CH341, + idProduct=USB_PID_CH341, + find_all=True, + ) + devices = list(found) if found else [] + if not devices: + raise ConnectionRefusedError( + "No EzCad2 controller found." + ) + try: + return devices[index] + except IndexError: + raise ConnectionRefusedError( + "EzCad2 controller found but index out of range." + ) + + @property + def is_connected(self) -> bool: + return bool(self._devices) + + def open(self, index: int = 0) -> int: + try: + device = self.find_device(index) + self._devices[index] = device + device.set_configuration() + interface = device.get_active_configuration()[(0, 0)] + self._interfaces[index] = interface + try: + if device.is_kernel_driver_active( + interface.bInterfaceNumber + ): + device.detach_kernel_driver( + interface.bInterfaceNumber + ) + except (NotImplementedError, usb.core.USBError): + pass + try: + usb.util.claim_interface(device, interface) + except usb.core.USBError: + usb.util.release_interface(device, interface) + usb.util.claim_interface(device, interface) + return index + except Exception as e: + logger.error(f"Failed to open USB device: {e}") + raise ConnectionRefusedError(str(e)) + + def close(self, index: int = 0) -> None: + device = self._devices.pop(index, None) + if device is None: + return + interface = self._interfaces.pop(index, None) + try: + if interface is not None: + usb.util.release_interface(device, interface) + usb.util.dispose_resources(device) + device.reset() + except Exception: + pass + + def write( + self, index: int = 0, packet: Optional[bytes] = None + ) -> None: + if packet is None: + return + device = self._devices.get(index) + if device is None: + raise ConnectionError("Not connected") + device.write( + endpoint=USB_WRITE_ENDPOINT, + data=packet, + timeout=self._timeout, + ) + + def read(self, index: int = 0) -> bytes: + device = self._devices.get(index) + if device is None: + raise ConnectionError("Not connected") + return bytes( + device.read( + endpoint=USB_READ_ENDPOINT, + size_or_buffer=8, + timeout=self._timeout, + ) + ) diff --git a/rayforge/ui_gtk/about.py b/rayforge/ui_gtk/about.py index 2cf556c5c..a5d706c20 100644 --- a/rayforge/ui_gtk/about.py +++ b/rayforge/ui_gtk/about.py @@ -157,6 +157,7 @@ def get_dependency_info() -> dict: "pypdf", "PyYAML", "pyserial", + "pyusb", "aiohttp", "websockets", ]: diff --git a/requirements.txt b/requirements.txt index 41f1367a9..7bd94f7bd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ PyMuPDF==1.27.2.2 PyOpenGL==3.1.10 PyOpenGL_accelerate==3.1.10 pypdf~=6.10.0 +pyusb>=1.2.0 pyserial>=3.5 pyvips==3.0.0 PyYAML==6.0.2 diff --git a/scripts/win/win_setup.sh b/scripts/win/win_setup.sh index 258ec8fa2..928785c52 100644 --- a/scripts/win/win_setup.sh +++ b/scripts/win/win_setup.sh @@ -167,6 +167,7 @@ if [[ "$1" == "pip" || -z "$1" ]]; then $PYTHON_BIN_PATH -m pip install --no-cache-dir GitPython==3.1.44 --break-system-packages $PYTHON_BIN_PATH -m pip install --no-cache-dir pygobject-stubs --break-system-packages $PYTHON_BIN_PATH -m pip install --no-cache-dir --no-build-isolation --no-deps pyvips==3.0.0 --break-system-packages + $PYTHON_BIN_PATH -m pip install --no-cache-dir pyusb --break-system-packages $PYTHON_BIN_PATH -m pip install --no-cache-dir pyserial --break-system-packages $PYTHON_BIN_PATH -m pip install --no-cache-dir raygeo --break-system-packages $PYTHON_BIN_PATH -m pip install --no-cache-dir ezdxf==1.4.2 pypdf~=6.10.0 trimesh==4.6.8 --break-system-packages diff --git a/tests/machine/driver/galvo/__init__.py b/tests/machine/driver/galvo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/machine/driver/galvo/test_galvo_controller.py b/tests/machine/driver/galvo/test_galvo_controller.py new file mode 100644 index 000000000..00579d66b --- /dev/null +++ b/tests/machine/driver/galvo/test_galvo_controller.py @@ -0,0 +1,305 @@ +""" +Tests for GalvoController. + +Tests the low-level protocol implementation against MockGalvoConnection. +""" + +import pytest + +from rayforge.machine.driver.galvo.galvo_consts import ( + SOURCE_CO2, + SOURCE_FIBER, + listJumpTo, + listEndOfList, + listReadyMark, + EnableLaser, + SetControlMode, + SetLaserMode, + SetDelayMode, + Fiber_SetMo, +) +from rayforge.machine.driver.galvo.galvo_controller import ( + GalvoController, + DRIVER_STATE_IDLE, + DRIVER_STATE_MARKING, +) +from rayforge.machine.driver.galvo.galvo_mock_connection import ( + MockGalvoConnection, +) + + +class TestGalvoController: + def _controller_with_mock(self, source=SOURCE_FIBER): + """Create a controller with a mock connection.""" + conn = MockGalvoConnection() + ctrl = GalvoController(connection=conn, source=source) + return ctrl, conn + + @pytest.mark.asyncio + async def test_connect_initializes_controller(self): + """Test that connect sends initialization sequence.""" + ctrl, conn = self._controller_with_mock() + + await ctrl.connect() + + assert ctrl.is_connected + assert len(conn.sent_singles) > 0 + init_commands = {c.cmd for c in conn.sent_singles} + assert EnableLaser in init_commands + assert SetControlMode in init_commands + assert SetLaserMode in init_commands + assert SetDelayMode in init_commands + + @pytest.mark.asyncio + async def test_connect_sends_fiber_mo_for_fiber(self): + """Test that fiber source sends Fiber_SetMo.""" + ctrl, conn = self._controller_with_mock(SOURCE_FIBER) + + await ctrl.connect() + + cmds = [c.cmd for c in conn.sent_singles] + assert Fiber_SetMo in cmds + + @pytest.mark.asyncio + async def test_disconnect(self): + """Test that disconnect closes connection.""" + ctrl, conn = self._controller_with_mock() + await ctrl.connect() + + await ctrl.disconnect() + + assert not ctrl.is_connected + + @pytest.mark.asyncio + async def test_enter_marking_mode(self): + """Test entering marking mode sends correct setup.""" + ctrl, conn = self._controller_with_mock() + await ctrl.connect() + + await ctrl.enter_marking_mode() + + assert ctrl.state == DRIVER_STATE_MARKING + + ctrl._list_write(listJumpTo, 100, 200, 0, 500) + await ctrl.enter_idle_mode() + + @pytest.mark.asyncio + async def test_enter_idle_mode(self): + """Test entering idle mode cleans up.""" + ctrl, conn = self._controller_with_mock() + await ctrl.connect() + + await ctrl.enter_marking_mode() + ctrl._list_write(listEndOfList) + await ctrl.enter_idle_mode() + + assert ctrl.state == DRIVER_STATE_IDLE + + def test_goto(self): + """Test that goto adds a jump command.""" + ctrl, _ = self._controller_with_mock() + ctrl._list_write(listReadyMark) + + ctrl.goto(0x8000, 0x8000) + + assert ctrl._last_x == 0x8000 + assert ctrl._last_y == 0x8000 + + ctrl.goto(0x9000, 0xA000) + + assert ctrl._last_x == 0x9000 + assert ctrl._last_y == 0xA000 + + def test_goto_same_position(self): + """Test that goto to same position does nothing.""" + ctrl, _ = self._controller_with_mock() + + ctrl.goto(0x8000, 0x8000) + index_after = ctrl._active_index + + ctrl.goto(0x8000, 0x8000) + assert ctrl._active_index == index_after + + def test_goto_clamps_coordinates(self): + """Test that goto clamps to valid galvo range.""" + ctrl, _ = self._controller_with_mock() + + ctrl.goto(-10, 0x10000) + assert ctrl._last_x == 0 + assert ctrl._last_y == 0xFFFF + + def test_mark(self): + """Test that mark adds a mark command.""" + ctrl, _ = self._controller_with_mock() + + ctrl.mark(0x8000, 0x8000) + assert ctrl._last_x == 0x8000 + assert ctrl._last_y == 0x8000 + + ctrl.mark(0x9000, 0xA000) + assert ctrl._last_x == 0x9000 + assert ctrl._last_y == 0xA000 + + def test_goto_xy_sends_single(self): + """Test that goto_xy sends a single command.""" + ctrl, conn = self._controller_with_mock() + + ctrl.goto_xy(0x8000, 0x8000) + + assert len(conn.sent_singles) > 0 + last = conn.sent_singles[-1] + assert last.cmd == 0x000D + + def test_set_mark_speed(self): + """Test that set_mark_speed adds list command.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_mark_speed(100.0) + + def test_set_travel_speed(self): + """Test that set_travel_speed adds list command.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_travel_speed(2000.0) + assert ctrl._travel_speed == 2000.0 + + ctrl.set_travel_speed(2000.0) + assert ctrl._travel_speed == 2000.0 + + def test_set_power_fiber(self): + """Test set_power for fiber source.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_power(50.0) + assert ctrl._power == 50.0 + + def test_set_power_co2(self): + """Test set_power for CO2 source.""" + ctrl, _ = self._controller_with_mock(SOURCE_CO2) + ctrl.set_frequency(30.0) + + ctrl.set_power(50.0) + assert ctrl._power == 50.0 + + def test_set_frequency_fiber(self): + """Test set_frequency for fiber source.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_frequency(30.0) + assert ctrl._frequency == 30.0 + + def test_set_frequency_co2(self): + """Test set_frequency for CO2 source.""" + ctrl, _ = self._controller_with_mock(SOURCE_CO2) + + ctrl.set_frequency(30.0) + + def test_set_laser_on_delay(self): + """Test set_laser_on_delay.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_laser_on_delay(100.0) + assert ctrl._delay_on == 100.0 + + def test_set_laser_off_delay(self): + """Test set_laser_off_delay.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_laser_off_delay(100.0) + assert ctrl._delay_off == 100.0 + + def test_set_polygon_delay(self): + """Test set_polygon_delay.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_polygon_delay(100.0) + assert ctrl._delay_poly == 100.0 + + def test_set_pulse_width(self): + """Test set_pulse_width.""" + ctrl, _ = self._controller_with_mock() + + ctrl.set_pulse_width(10) + assert ctrl._pulse_width == 10 + + def test_dwell(self): + """Test dwell command.""" + ctrl, _ = self._controller_with_mock() + initial_index = ctrl._active_index + + ctrl.dwell(10) + + assert ctrl._active_index > initial_index + + def test_wait(self): + """Test wait command.""" + ctrl, _ = self._controller_with_mock() + initial_index = ctrl._active_index + + ctrl.wait(10) + + assert ctrl._active_index > initial_index + + def test_port_on_off(self): + """Test port manipulation.""" + ctrl, _ = self._controller_with_mock() + + ctrl._port_on(0) + assert ctrl._port_bits & 1 + + ctrl._port_off(0) + assert not (ctrl._port_bits & 1) + + def test_source_property(self): + """Test source property getter/setter.""" + ctrl, _ = self._controller_with_mock(SOURCE_FIBER) + + assert ctrl.source == SOURCE_FIBER + ctrl.source = SOURCE_CO2 + assert ctrl.source == SOURCE_CO2 + + def test_default_connection_on_connect(self): + """Test that connect uses no connection initially.""" + ctrl = GalvoController(source=SOURCE_FIBER) + assert ctrl._connection is None + + def test_is_connected_false_initially(self): + """Test is_connected returns False before connect.""" + ctrl, _ = self._controller_with_mock() + assert not ctrl.is_connected + + def test_convert_speed(self): + """Test speed conversion.""" + ctrl, _ = self._controller_with_mock() + result = ctrl._convert_speed(100.0) + expected = int(100.0 * 500 / 1000.0) + assert result == expected + + def test_convert_frequency(self): + """Test frequency conversion.""" + ctrl, _ = self._controller_with_mock() + result = ctrl._convert_frequency(30.0, base=20000.0) + expected = int(round(20000.0 / 30.0)) & 0xFFFF + assert result == expected + + def test_convert_power(self): + """Test power conversion.""" + ctrl, _ = self._controller_with_mock() + result = ctrl._convert_power(50.0) + expected = int(round(50.0 * 0xFFF / 100.0)) + assert result == expected + + def test_set_fpk_for_co2(self): + """Test set_fpk works for CO2.""" + ctrl, _ = self._controller_with_mock(SOURCE_CO2) + ctrl.set_frequency(30.0) + + ctrl.set_fpk(50.0) + assert ctrl._fpk == 50.0 + + def test_set_fpk_ignored_for_fiber(self): + """Test set_fpk is ignored for fiber.""" + ctrl, _ = self._controller_with_mock(SOURCE_FIBER) + + ctrl.set_fpk(50.0) + assert ctrl._fpk is None diff --git a/tests/machine/driver/galvo/test_galvo_driver.py b/tests/machine/driver/galvo/test_galvo_driver.py new file mode 100644 index 000000000..8ae71276e --- /dev/null +++ b/tests/machine/driver/galvo/test_galvo_driver.py @@ -0,0 +1,393 @@ +""" +Tests for GalvoDriver using MockGalvoConnection. + +Verifies driver-level commands and state management. +""" + +import asyncio +import logging +from typing import AsyncGenerator + +import pytest +import pytest_asyncio +from raygeo.ops import Ops + +from rayforge.core.doc import Doc +from rayforge.machine.driver.driver import ( + Axis, + DriverMaturity, +) +from rayforge.machine.driver.galvo.galvo_driver import GalvoDriver +from rayforge.machine.driver.galvo.galvo_encoder import GalvoEncoder +from rayforge.machine.models.laser import Laser, LaserType +from rayforge.machine.models.machine import Machine + +logger = logging.getLogger(__name__) + + +async def wait_for_connection( + driver: GalvoDriver, timeout: float = 2.0 +) -> bool: + """Wait for driver to establish connection.""" + await driver.connect() + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + if driver.is_connected: + return True + await asyncio.sleep(0.05) + return False + + +@pytest_asyncio.fixture +async def driver( + lite_context, +) -> AsyncGenerator[GalvoDriver, None]: + """Provides a configured GalvoDriver with mock connection.""" + machine = Machine(lite_context) + machine.driver_name = "GalvoDriver" + lite_context.machine_mgr.add_machine(machine) + + driver = GalvoDriver(lite_context, machine) + driver._setup_implementation(source="fiber", mock=True) + + yield driver + + await driver.cleanup() + await machine.shutdown() + + +class TestGalvoDriver: + @pytest.mark.asyncio + async def test_setup_with_valid_config(self, driver): + """Test that driver setup succeeds with valid configuration.""" + assert driver._controller is not None + assert driver._source == "fiber" + assert driver._mock + + @pytest.mark.asyncio + async def test_connect_to_mock(self, driver): + """Test that driver can connect to mock controller.""" + assert await wait_for_connection(driver) + assert driver.is_connected + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_move_to(self, driver): + """Test that move_to command works.""" + assert await wait_for_connection(driver) + + await driver.move_to(10.0, 20.0) + + controller = driver._controller + assert controller is not None + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_set_power(self, driver): + """Test that set_power command works.""" + assert await wait_for_connection(driver) + + test_head = Laser() + test_head.uid = "test-head-1" + test_head.tool_number = 0 + + await driver.set_power(test_head, 0.5) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_set_power_zero(self, driver): + """Test that set_power(0) works.""" + assert await wait_for_connection(driver) + + test_head = Laser() + test_head.uid = "test-head-2" + test_head.tool_number = 1 + + await driver.set_power(test_head, 0.0) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_jog(self, driver): + """Test that jog command works.""" + assert await wait_for_connection(driver) + + await driver.jog(5000, x=10.0) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_set_hold(self, driver): + """Test that set_hold stops/resumes.""" + assert await wait_for_connection(driver) + + await driver.set_hold(True) + await driver.set_hold(False) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_cancel(self, driver): + """Test that cancel works.""" + assert await wait_for_connection(driver) + + await driver.cancel() + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_clear_alarm(self, driver): + """Test that clear_alarm works.""" + assert await wait_for_connection(driver) + + await driver.clear_alarm() + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_select_tool_noop(self, driver): + """Test that select_tool does nothing (no-op).""" + await driver.connect() + + await driver.select_tool(1) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_read_settings_sends_signal(self, driver): + """Test that read_settings sends the settings_read signal.""" + await driver.connect() + + settings_received = [] + + def on_settings_read(sender, settings): + settings_received.append(settings) + + driver.settings_read.connect(on_settings_read) + + await driver.read_settings() + + assert len(settings_received) == 1 + assert settings_received[0] == [] + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_write_setting_noop(self, driver): + """Test that write_setting does nothing (no-op).""" + await driver.connect() + + await driver.write_setting("test_key", "test_value") + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_read_wcs_offsets(self, driver): + """Test that read_wcs_offsets returns machine offsets.""" + assert await wait_for_connection(driver) + + offsets = await driver.read_wcs_offsets() + + assert "MACHINE" in offsets + assert offsets["MACHINE"] == (0.0, 0.0, 0.0) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_read_parser_state_returns_none(self, driver): + """Test that read_parser_state returns None.""" + await driver.connect() + + state = await driver.read_parser_state() + + assert state is None + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_run_probe_cycle_not_supported(self, driver): + """Test that run_probe_cycle indicates not supported.""" + await driver.connect() + + probe_messages = [] + + def on_probe_status_changed(sender, message): + probe_messages.append(message) + + driver.probe_status_changed.connect(on_probe_status_changed) + + result = await driver.run_probe_cycle(Axis.Z, -10, 100) + + assert result is None + assert len(probe_messages) > 0 + assert "not supported" in probe_messages[0].lower() + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_run_with_machine_code(self, driver): + """Test that run method executes encoded commands.""" + doc = Doc() + ops = Ops() + ops.move_to(10.0, 20.0) + ops.line_to(30.0, 40.0) + + encoded = driver._machine.encode_ops(ops, doc) + + assert await wait_for_connection(driver) + + await driver.run(encoded, doc, ops) + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_run_raw_warns_and_finishes(self, driver): + """Test that run_raw logs warning and sends job_finished signal.""" + await driver.connect() + + finished_events = [] + + def on_job_finished(sender): + finished_events.append(True) + + driver.job_finished.connect(on_job_finished) + + await driver.run_raw("G0 X10") + + assert len(finished_events) == 1 + + await driver.cleanup() + + @pytest.mark.asyncio + async def test_connection_status_signals(self, driver): + """Test that connection status signals are emitted correctly.""" + status_changes = [] + + def on_connection_status_changed(sender, status, message): + status_changes.append((status, message)) + + driver.connection_status_changed.connect(on_connection_status_changed) + + assert await wait_for_connection(driver) + + await driver.cleanup() + + assert len(status_changes) >= 2 + + def test_get_encoder_returns_correct_type(self, driver): + """Test that get_encoder returns GalvoEncoder.""" + encoder = driver.get_encoder() + assert isinstance(encoder, GalvoEncoder) + + def test_machine_space_wcs_properties(self, driver): + """Test that machine space WCS properties return correct values.""" + assert driver.machine_space_wcs == "MACHINE" + assert driver.machine_space_wcs_display_name != "" + + def test_can_home_returns_false(self, driver): + """Test that can_home returns False for galvo.""" + assert not driver.can_home() + assert not driver.can_home(Axis.X) + assert not driver.can_home(Axis.Y) + + def test_can_jog_returns_true(self, driver): + """Test that can_jog returns True.""" + assert driver.can_jog() + assert driver.can_jog(Axis.X) + assert driver.can_jog(Axis.Y) + + def test_driver_label_and_subtitle(self, driver): + """Test that driver has correct label and subtitle.""" + assert "Galvo" in driver.label + assert "EzCad2" in driver.label + assert driver.subtitle != "" + assert driver.reports_granular_progress is False + assert driver.uses_gcode is False + + def test_driver_maturity(self, driver): + """Test driver maturity level.""" + assert driver.maturity == DriverMaturity.EXPERIMENTAL + + @pytest.mark.asyncio + async def test_disconnect_when_not_connected(self, driver): + """Test that cleanup works even when not connected.""" + await driver.cleanup() + assert not driver.is_connected + + @pytest.mark.asyncio + async def test_cleanup_resets_controller(self, driver): + """Test that cleanup properly resets controller.""" + await driver.cleanup() + assert driver._controller is None + + def test_resource_uri_property(self, driver): + """Test that resource_uri returns correct format.""" + uri = driver.resource_uri + assert uri == "usb://galvo/ezcad2" + + def test_setup_vars_include_source_and_mock(self): + """Test that setup vars include source and mock options.""" + varset = GalvoDriver.get_setup_vars() + keys = [v.key for v in varset] + assert "source" in keys + assert "mock" in keys + + def test_precheck_passes_by_default(self): + """Test that precheck does not raise.""" + GalvoDriver.precheck() + + def test_supported_wcs_includes_machine(self, driver): + """Test that supported_wcs includes MACHINE.""" + wcs_list = driver.supported_wcs + assert "MACHINE" in wcs_list + + @pytest.mark.asyncio + async def test_reconnect(self, driver): + """Test driver can reconnect after disconnect.""" + assert await wait_for_connection(driver) + assert driver.is_connected + + await driver.cleanup() + assert not driver.is_connected + + driver._setup_implementation(source="fiber", mock=True) + assert await wait_for_connection(driver) + assert driver.is_connected + + await driver.cleanup() + + def test_get_laser_capabilities_diode(self, driver): + """Test that laser capabilities are empty for diode.""" + laser = Laser() + laser.laser_type = LaserType.DIODE + + result = driver.get_laser_capabilities(laser) + + assert result == () + + def test_get_laser_capabilities_co2(self, driver): + """Test that laser capabilities include PWM for CO2.""" + laser = Laser() + laser.laser_type = LaserType.CO2 + laser.pwm_frequency = 1000 + laser.max_pwm_frequency = 5000 + laser.pulse_width = 50 + laser.min_pulse_width = 5 + laser.max_pulse_width = 500 + + result = driver.get_laser_capabilities(laser) + + assert len(result) == 1 + + def test_get_laser_capabilities_fiber(self, driver): + """Test that laser capabilities include PWM for fiber.""" + laser = Laser() + laser.laser_type = LaserType.FIBER + + result = driver.get_laser_capabilities(laser) + + assert len(result) == 1 + assert result[0].name == "PWM" diff --git a/tests/machine/driver/galvo/test_galvo_encoder.py b/tests/machine/driver/galvo/test_galvo_encoder.py new file mode 100644 index 000000000..90ebe4f0e --- /dev/null +++ b/tests/machine/driver/galvo/test_galvo_encoder.py @@ -0,0 +1,320 @@ +""" +Tests for GalvoEncoder. + +Verifies conversion of Ops commands to EzCad2 binary protocol. +""" + +from unittest.mock import MagicMock + +from raygeo.ops import Ops + +from rayforge.machine.driver.galvo.galvo_encoder import GalvoEncoder +from rayforge.machine.driver.galvo.galvo_consts import GALVO_CENTER + + +class TestGalvoEncoder: + def _make_machine(self): + machine = MagicMock() + machine.active_wcs = "MACHINE" + return machine + + def _make_doc(self): + doc = MagicMock() + return doc + + def test_encode_empty_ops(self): + """Test encoding empty ops produces empty output.""" + encoder = GalvoEncoder() + ops = Ops() + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert result.text == "" + assert result.driver_data.get("binary", b"") == b"" + + def test_encode_move_to(self): + """Test encoding a move (jump) command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.move_to(10.0, 20.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "JUMP" in result.text + + def test_encode_line_to(self): + """Test encoding a line (mark) command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.move_to(0.0, 0.0) + ops.line_to(10.0, 15.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "MARK" in result.text + + def test_encode_job_start_end(self): + """Test encoding job start/end commands.""" + encoder = GalvoEncoder() + ops = Ops() + ops.job_start() + ops.job_end() + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "JOB START" in result.text + assert "JOB END" in result.text + + def test_encode_set_power(self): + """Test encoding set power command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.set_power(0.5) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "POWER" in result.text + assert "50.0" in result.text + + def test_encode_set_cut_speed(self): + """Test encoding set cut speed command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.set_cut_speed(100.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "MARK_SPEED" in result.text + + def test_encode_set_travel_speed(self): + """Test encoding set travel speed command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.set_travel_speed(2000.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "JUMP_SPEED" in result.text + + def test_encode_set_frequency(self): + """Test encoding set frequency command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.set_frequency(30000) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "FREQUENCY" in result.text + + def test_encode_set_pulse_width(self): + """Test encoding set pulse width command.""" + encoder = GalvoEncoder() + ops = Ops() + ops.set_pulse_width(10.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "PULSE_WIDTH" in result.text + + def test_encode_binary_data_present(self): + """Test that binary data is produced.""" + encoder = GalvoEncoder() + ops = Ops() + ops.job_start() + ops.move_to(0.0, 0.0) + ops.line_to(10.0, 15.0) + ops.job_end() + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert len(result.driver_data.get("binary", b"")) > 0 + + def test_encode_binary_is_valid_packets(self): + """Test that binary data consists of valid 12-byte packets.""" + encoder = GalvoEncoder() + ops = Ops() + ops.move_to(0.0, 0.0) + ops.line_to(10.0, 15.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + binary = result.driver_data.get("binary", b"") + + assert len(binary) % 12 == 0 + + def test_encode_op_map(self): + """Test that op_map is correctly built.""" + encoder = GalvoEncoder() + ops = Ops() + ops.move_to(0.0, 0.0) + ops.line_to(10.0, 15.0) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert len(result.op_map.op_to_machine_code) > 0 + assert len(result.op_map.machine_code_to_op) > 0 + + def test_co2_source_uses_power_ratio(self): + """Test CO2 source uses power ratio command.""" + encoder = GalvoEncoder(source="co2") + ops = Ops() + ops.set_power(0.5) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "POWER" in result.text + + def test_uv_source_uses_power_ratio(self): + """Test UV source uses power ratio command.""" + encoder = GalvoEncoder(source="uv") + ops = Ops() + ops.set_power(0.5) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "POWER" in result.text + + def test_encode_layer_start_end(self): + """Test encoding layer start/end commands.""" + encoder = GalvoEncoder() + ops = Ops() + ops.layer_start("layer-1") + ops.move_to(0.0, 0.0) + ops.layer_end("layer-1") + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "Layer" in result.text + + def test_encode_arc(self): + """Test encoding arc command linearizes to lines.""" + encoder = GalvoEncoder() + ops = Ops() + ops.move_to(0.0, 0.0) + ops.arc_to(10.0, 10.0, -5.0, 5.0, clockwise=True) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "ARC" in result.text or "MARK" in result.text + + def test_encode_air_assist(self): + """Test encoding air assist commands.""" + encoder = GalvoEncoder() + ops = Ops() + ops.enable_air_assist(True) + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + assert "AIR_ASSIST" in result.text + + def test_encode_multiple_commands(self): + """Test encoding a sequence of multiple commands.""" + encoder = GalvoEncoder() + ops = Ops() + ops.job_start() + ops.set_power(0.5) + ops.set_cut_speed(100.0) + ops.set_frequency(30000) + ops.move_to(0.0, 0.0) + ops.line_to(10.0, 20.0) + ops.line_to(30.0, 40.0) + ops.move_to(50.0, 50.0) + ops.job_end() + machine = self._make_machine() + doc = self._make_doc() + + result = encoder.encode(ops, machine, doc) + + lines = result.text.split("\n") + assert len(lines) > 5 + assert any("JOB START" in ln for ln in lines) + assert any("POWER" in ln for ln in lines) + assert any("MARK_SPEED" in ln for ln in lines) + assert any("JUMP" in ln for ln in lines) + assert any("MARK" in ln for ln in lines) + assert any("JOB END" in ln for ln in lines) + + def test_mm_to_galvo_center(self): + """Test that center position maps to 0x8000.""" + encoder = GalvoEncoder() + result = encoder._mm_to_galvo(0.0) + assert result == GALVO_CENTER + + def test_mm_to_galvo_positive(self): + """Test positive coordinate conversion.""" + encoder = GalvoEncoder() + result = encoder._mm_to_galvo(100.0) + assert result == 0xC000 + + def test_mm_to_galvo_negative(self): + """Test negative coordinate conversion.""" + encoder = GalvoEncoder() + result = encoder._mm_to_galvo(-100.0) + assert result == 0x4000 + + def test_mm_to_galvo_clamp(self): + """Test clamping of out-of-range coordinates.""" + encoder = GalvoEncoder() + assert encoder._mm_to_galvo(-200.0) == 0 + assert encoder._mm_to_galvo(200.0) == 0xFFFF + + def test_power_to_int(self): + """Test power conversion from normalized to percent.""" + encoder = GalvoEncoder() + assert encoder._power_to_int(0.5) == 50 + assert encoder._power_to_int(1.0) == 100 + assert encoder._power_to_int(0.0) == 0 + + def test_speed_to_int(self): + """Test speed conversion from mm/s to galvo units.""" + encoder = GalvoEncoder(galvos_per_mm=500) + expected = int(100.0 * 500 / 1000.0) + assert encoder._speed_to_int(100.0) == expected + + def test_frequency_to_period(self): + """Test frequency to q-switch period conversion.""" + encoder = GalvoEncoder() + result = encoder._frequency_to_period(30.0, base=20000.0) + expected = int(round(20000.0 / 30.0)) & 0xFFFF + assert result == expected + + def test_encoder_default_values(self): + """Test encoder default values.""" + encoder = GalvoEncoder() + assert encoder._source == "fiber" + assert encoder._galvos_per_mm == 500 + assert encoder._default_mark_speed == 100.0 + assert encoder._default_jump_speed == 2000.0 diff --git a/tests/machine/driver/galvo/test_galvo_mock_connection.py b/tests/machine/driver/galvo/test_galvo_mock_connection.py new file mode 100644 index 000000000..ecc66b87f --- /dev/null +++ b/tests/machine/driver/galvo/test_galvo_mock_connection.py @@ -0,0 +1,200 @@ +""" +Tests for MockGalvoConnection. + +Verifies the mock correctly simulates USB communication with +EzCad2/LMC controller boards. +""" + +import struct + +from rayforge.machine.driver.galvo.galvo_consts import ( + COMMAND_SIZE, + LIST_BUFFER_SIZE, + GetSerialNo, + GetVersion, + ReadPort, +) +from rayforge.machine.driver.galvo.galvo_mock_connection import ( + MockGalvoConnection, +) + + +class TestMockGalvoConnection: + def test_initial_state(self): + """Test that a new mock connection starts disconnected.""" + conn = MockGalvoConnection() + assert not conn.is_connected + assert conn.sent_singles == [] + assert conn.sent_lists == [] + + def test_open_and_close(self): + """Test open/close lifecycle.""" + conn = MockGalvoConnection() + idx = conn.open() + assert idx == 0 + assert conn.is_connected + + conn.close() + assert not conn.is_connected + + def test_write_single_command(self): + """Test writing a single command stores it.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", 0x0007, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + + assert len(conn.sent_singles) == 1 + assert conn.sent_singles[0].cmd == 0x0007 + + def test_write_list_packet(self): + """Test writing a list packet stores it.""" + conn = MockGalvoConnection() + conn.open() + + packet = bytearray(LIST_BUFFER_SIZE) + for i in range(0, LIST_BUFFER_SIZE, COMMAND_SIZE): + struct.pack_into("<6H", packet, i, 0x8001, 100, 200, 0, 500, 0) + conn.write(packet=bytes(packet)) + + assert len(conn.sent_lists) == 1 + + def test_read_returns_implied_response(self): + """Test that set_implied_response controls read output.""" + conn = MockGalvoConnection() + conn.open() + + expected = struct.pack("<4H", 1, 2, 3, 4) + conn.set_implied_response(expected) + result = conn.read() + + assert result == expected + + def test_read_random_when_no_implied_response(self): + """Test that read returns random data when no response is set.""" + conn = MockGalvoConnection() + conn.open() + + result = conn.read() + assert len(result) == 8 + + def test_get_serial_number_response(self): + """Test that GetSerialNo sets implied response.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", GetSerialNo, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + result = conn.read() + + assert b"MOCK" in result + + def test_get_version_response(self): + """Test that GetVersion sets implied response.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", GetVersion, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + result = conn.read() + + words = struct.unpack("<4H", result) + assert words[3] != 0 + + def test_read_port_response(self): + """Test that ReadPort returns port bits.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", ReadPort, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + result = conn.read() + + words = struct.unpack("<4H", result) + assert words[1] == 0 + + def test_was_list_executed_after_list_write(self): + """Test that was_list_executed tracks list writes.""" + conn = MockGalvoConnection() + conn.open() + + assert not conn.was_list_executed + + packet = bytearray(LIST_BUFFER_SIZE) + conn.write(packet=bytes(packet)) + + assert conn.was_list_executed + + def test_reset_clears_state(self): + """Test that reset() clears all recorded state.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", 0x0007, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + packet = bytearray(LIST_BUFFER_SIZE) + conn.write(packet=bytes(packet)) + + assert len(conn.sent_singles) == 1 + assert len(conn.sent_lists) == 1 + + conn.reset() + + assert conn.sent_singles == [] + assert conn.sent_lists == [] + assert not conn.was_list_executed + + def test_on_send_callback_for_single(self): + """Test that on_send callback is called for single commands.""" + received = [] + + def callback(msg): + received.append(msg) + + conn = MockGalvoConnection(on_send=callback) + conn.open() + + cmd = struct.pack("<6H", 0x0007, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + + assert len(received) == 1 + assert "GetVersion" in received[0] + + def test_on_send_callback_for_list(self): + """Test that on_send callback is called for list packets.""" + received = [] + + def callback(msg): + received.append(msg) + + conn = MockGalvoConnection(on_send=callback) + conn.open() + + packet = bytearray(LIST_BUFFER_SIZE) + conn.write(packet=bytes(packet)) + + assert len(received) > 0 + + def test_write_raises_for_no_packet(self): + """Test that writing None does nothing.""" + conn = MockGalvoConnection() + conn.open() + + conn.write(packet=None) + assert conn.sent_singles == [] + assert conn.sent_lists == [] + + def test_reopen_resets_state(self): + """Test that reopening clears sent commands.""" + conn = MockGalvoConnection() + conn.open() + + cmd = struct.pack("<6H", 0x0007, 0, 0, 0, 0, 0) + conn.write(packet=cmd) + + conn.close() + conn.open() + + assert conn.sent_singles == [] + assert conn.sent_lists == []