diff --git a/docs/api/pylabrobot.agilent.rst b/docs/api/pylabrobot.agilent.rst index a4115aff613..39a52547b6b 100644 --- a/docs/api/pylabrobot.agilent.rst +++ b/docs/api/pylabrobot.agilent.rst @@ -140,3 +140,22 @@ VSpin .. autoclass:: pylabrobot.agilent.vspin.VSpinCentrifugeBackend.SpinParams :members: + + +PlateLoc +-------- + +.. currentmodule:: pylabrobot.agilent.plateloc + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + PlateLoc + PlateLocSealer + PlateLocSealerBackend + PlateLocDriver + PlateLocSerialProfile + PlateLocStatus + PlateLocError diff --git a/docs/user_guide/01_material-handling/sealers/sealers.md b/docs/user_guide/01_material-handling/sealers/sealers.md index f3a8a68a930..840c6221e0d 100644 --- a/docs/user_guide/01_material-handling/sealers/sealers.md +++ b/docs/user_guide/01_material-handling/sealers/sealers.md @@ -3,7 +3,7 @@ ## Installation ```bash -pip install pylabrobot[serial] +pip install "pylabrobot[serial]" ``` In automated wet lab workflows, **microplate sealers** are essential for preserving sample integrity. @@ -11,6 +11,10 @@ They prevent **evaporation**, **cross-contamination**, and **spillage**, especia PyLabRobot supports integration with various sealer machines, allowing you to programmatically seal plates as part of your automation workflows. +Supported serial sealers include: + +- Agilent PlateLoc through `pylabrobot.agilent.PlateLoc` + --- ## Types of Sealers diff --git a/docs/user_guide/agilent/index.md b/docs/user_guide/agilent/index.md index 1b5d31e58fb..affd0fb4803 100644 --- a/docs/user_guide/agilent/index.md +++ b/docs/user_guide/agilent/index.md @@ -4,5 +4,6 @@ :maxdepth: 1 biotek/index +plateloc/hello-world vspin/hello-world ``` diff --git a/docs/user_guide/agilent/plateloc/hello-world.ipynb b/docs/user_guide/agilent/plateloc/hello-world.ipynb new file mode 100644 index 00000000000..246a6b5a63d --- /dev/null +++ b/docs/user_guide/agilent/plateloc/hello-world.ipynb @@ -0,0 +1,157 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "plateloc-intro", + "source": "# Agilent PlateLoc\n\nThe Agilent PlateLoc is controlled through PLR's `Sealer` capability with a direct RS-232 serial driver. It does not require Agilent ActiveX, VWorks, or vendor server software. Install the optional serial dependency before connecting:\n\n```bash\npip install \"pylabrobot[serial]\"\n```", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-import", + "source": "from pylabrobot.agilent import PlateLoc\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-setup", + "source": "await plateloc.setup()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-temp", + "source": "await plateloc.sealer.set_sealing_temperature(175)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-time", + "source": "await plateloc.sealer.set_sealing_time(0.5)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-status-first", + "source": "status = await plateloc.sealer.request_status()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-sealer-intro", + "source": "The device also exposes the standard sealer capability:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-seal", + "source": "await plateloc.sealer.seal(temperature=175, duration=1.5)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-open", + "source": "await plateloc.sealer.open()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-close", + "source": "await plateloc.sealer.close()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-sealer-notes", + "source": "`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time.\n\nThe PlateLoc sealer capability also exposes independent setpoint and status helpers:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-set-temp-160", + "source": "await plateloc.sealer.set_sealing_temperature(160)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-time-1", + "source": "await plateloc.sealer.set_sealing_time(1.0)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-status-print", + "source": "status = await plateloc.sealer.request_status()\nprint(status.target_temperature, status.sealing_time, status.cycle_complete)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-status-notes", + "source": "`request_status()` returns the best-known PLR state plus a live cycle-complete query. The direct serial protocol decoded here does not expose actual block temperature or actual stored time reads, so PLR reports the last successfully written target temperature and sealing time.", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-stop", + "source": "await plateloc.stop()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-protocol", + "source": "## Serial command profile\n\nThe decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional setpoint convention: the digits after the decimal point are the integer controller value.\n\n| Operation | Frame |\n|---|---|\n| Set sealing temperature | `ST 0.{temperature_celsius:03d}\\r` |\n| Set sealing time | `SS 0.{seconds_x10:02d}\\r` |\n| Start cycle | `GO 00\\r` |\n| Stop cycle | `AC 00\\r` |\n| Move stage out | `SO 00\\r` |\n| Move stage in | `SI 00\\r` |\n| Apply seal | `AS 00\\r` |\n| Clear error | `CL 00\\r` |\n| Check cycle complete | `CC 00\\r` |\n\nFor example, `set_sealing_temperature(175)` writes `ST 0.175\\r`, `set_sealing_temperature(30)` writes `ST 0.030\\r`, `set_sealing_time(0.5)` writes `SS 0.05\\r`, and `set_sealing_time(1.2)` writes `SS 0.12\\r`.\n\nNegative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\\r`. The cycle-complete command returns `True` for `CCAK\\r` and `False` for `CCNK\\r`.\n\nYou can still override serial settings and timing with `PlateLocSerialProfile` while keeping the same PLR frontend:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-profile", + "source": "from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile\n\nprofile = PlateLocSerialProfile(\n baudrate=19200,\n stage_move_delay=6,\n)\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\", profile=profile)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-troubleshooting", + "source": "## Troubleshooting\n\nThe PlateLoc RS-232 connector is not VGA and is not USB TTL. Use a USB-to-RS-232 adapter plus the correct DB9 cable for the instrument. If the port opens but every command times out, verify the PlateLoc is powered, the rear serial cable is seated, and the cable wiring matches the instrument requirement. Some setups require a null-modem DB9 adapter rather than a straight-through cable.", + "metadata": {} + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pylabrobot/agilent/__init__.py b/pylabrobot/agilent/__init__.py index 47564b66b6b..c3afcc64913 100644 --- a/pylabrobot/agilent/__init__.py +++ b/pylabrobot/agilent/__init__.py @@ -12,4 +12,13 @@ SynergyH1, SynergyH1Backend, ) +from .plateloc import ( + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSealer, + PlateLocSealerBackend, + PlateLocSerialProfile, + PlateLocStatus, +) from .vspin import Access2, Access2Driver, VSpin, VSpinCentrifugeBackend, VSpinDriver diff --git a/pylabrobot/agilent/plateloc/__init__.py b/pylabrobot/agilent/plateloc/__init__.py new file mode 100644 index 00000000000..dc194e38b7d --- /dev/null +++ b/pylabrobot/agilent/plateloc/__init__.py @@ -0,0 +1,9 @@ +from .plateloc import ( + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSealer, + PlateLocSealerBackend, + PlateLocSerialProfile, + PlateLocStatus, +) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py new file mode 100644 index 00000000000..152d01bdc8f --- /dev/null +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -0,0 +1,430 @@ +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import re +import time +from typing import Optional, cast + +from pylabrobot.capabilities.capability import BackendParams, need_capability_ready +from pylabrobot.capabilities.sealing import Sealer, SealerBackend +from pylabrobot.device import Device, Driver +from pylabrobot.io.serial import Serial + +try: + import serial as _serial # noqa: F401 + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +logger = logging.getLogger(__name__) + + +_ACK_RE = re.compile(r"^\s*(?P[A-Z0-9]{2})(?P[AN])K(?:\((?P.*)\))?\s*$") + + +class PlateLocError(RuntimeError): + """Raised when PlateLoc communication or protocol handling fails.""" + + +@dataclasses.dataclass(frozen=True) +class PlateLocStatus: + """Best-known PlateLoc state from direct serial control.""" + + port: str + connected: bool + target_temperature: Optional[float] + sealing_time: Optional[float] + stage_position: Optional[str] + cycle_complete: Optional[bool] + last_command: Optional[str] + last_response: Optional[str] + + +@dataclasses.dataclass(frozen=True) +class PlateLocSerialProfile: + """Serial settings for a PlateLoc controller. + + The decoded low-level protocol uses carriage-return-terminated ASCII frames. + Setpoint payloads are encoded as a decimal fraction whose fractional digits hold the integer + setpoint, for example ``ST 0.175`` for 175 C and ``SS 0.12`` for 1.2 s. + """ + + baudrate: int = 19200 + bytesize: int = 8 + parity: str = "N" + stopbits: int = 1 + timeout: float = 1 + write_timeout: float = 1 + rtscts: bool = False + dsrdtr: bool = False + xonxoff: bool = False + read_delay: float = 0.05 + ack_timeout: float = 10 + response_timeout: float = 2 + stage_move_delay: float = 6 + cycle_poll_interval: float = 0.5 + command_terminator: str = "\r" + response_terminator: bytes = b"\r" + + def serialize(self) -> dict: + return { + "baudrate": self.baudrate, + "bytesize": self.bytesize, + "parity": self.parity, + "stopbits": self.stopbits, + "timeout": self.timeout, + "write_timeout": self.write_timeout, + "rtscts": self.rtscts, + "dsrdtr": self.dsrdtr, + "xonxoff": self.xonxoff, + "read_delay": self.read_delay, + "ack_timeout": self.ack_timeout, + "response_timeout": self.response_timeout, + "stage_move_delay": self.stage_move_delay, + "cycle_poll_interval": self.cycle_poll_interval, + "command_terminator": self.command_terminator, + "response_terminator": self.response_terminator.decode("latin1"), + } + + @classmethod + def deserialize(cls, data: dict) -> "PlateLocSerialProfile": + data = data.copy() + if "response_terminator" in data: + data["response_terminator"] = data["response_terminator"].encode("latin1") + return cls(**data) + + +class PlateLocDriver(Driver): + """Direct serial transport for the Agilent PlateLoc thermal microplate sealer.""" + + def __init__( + self, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + profile: Optional[PlateLocSerialProfile | dict] = None, + timeout: float = 30, + ) -> None: + super().__init__() + if not HAS_SERIAL: + raise RuntimeError( + "pyserial is not installed. Install with: pip install pylabrobot[serial]. " + f"Import error: {_SERIAL_IMPORT_ERROR}" + ) + if isinstance(profile, dict): + profile = PlateLocSerialProfile.deserialize(profile) + self.profile = profile or PlateLocSerialProfile() + self.timeout = timeout + self._connected = False + self._last_command: Optional[str] = None + self._last_response: Optional[str] = None + self.io = Serial( + human_readable_device_name="Agilent PlateLoc Sealer", + port=port, + vid=vid, + pid=pid, + baudrate=self.profile.baudrate, + bytesize=self.profile.bytesize, + parity=self.profile.parity, + stopbits=self.profile.stopbits, + write_timeout=self.profile.write_timeout, + timeout=self.profile.timeout, + rtscts=self.profile.rtscts, + dsrdtr=self.profile.dsrdtr, + xonxoff=self.profile.xonxoff, + ) + + @property + def port(self) -> str: + return cast(str, self.io.port) + + @property + def connected(self) -> bool: + return self._connected + + @property + def last_command(self) -> Optional[str]: + return self._last_command + + @property + def last_response(self) -> Optional[str]: + return self._last_response + + async def setup(self, backend_params: Optional[BackendParams] = None): + await self.io.setup() + self._connected = True + logger.info("[PlateLoc %s] connected", self.port) + + async def stop(self): + await self.io.stop() + self._connected = False + logger.info("[PlateLoc %s] disconnected", self.port) + + async def send_command( + self, + command: str, + *, + timeout: Optional[float] = None, + required: bool = True, + ) -> Optional[str]: + """Send one literal PlateLoc serial frame and return the raw response.""" + command = command.removesuffix(self.profile.command_terminator) + await self.io.reset_input_buffer() + await self.io.write(f"{command}{self.profile.command_terminator}".encode("ascii")) + self._last_command = command + + if self.profile.read_delay > 0: + await asyncio.sleep(self.profile.read_delay) + + response = await self.read_response(timeout=timeout, required=required) + self._last_response = response + return response + + async def read_response( + self, + timeout: Optional[float] = None, + required: bool = True, + ) -> Optional[str]: + deadline = time.time() + (timeout if timeout is not None else self.profile.response_timeout) + chunks = bytearray() + while time.time() < deadline: + with self.io.temporary_timeout(max(0.01, min(0.1, deadline - time.time()))): + chunk = await self.io.read(1) + if chunk: + chunks.extend(chunk) + if chunks.endswith(self.profile.response_terminator): + break + elif len(chunks) > 0: + break + + if len(chunks) == 0: + if required: + raise TimeoutError("Timeout while waiting for PlateLoc response") + return None + return bytes(chunks).decode("utf-8", errors="replace").strip() + + def serialize(self) -> dict: + return { + **super().serialize(), + "port": self.port, + "profile": self.profile.serialize(), + "timeout": self.timeout, + } + + +class PlateLocSealerBackend(SealerBackend): + """Translates SealerBackend operations into direct PlateLoc serial commands.""" + + _SET_TEMPERATURE = "ST" + _SET_TIME = "SS" + _MOVE_STAGE_OUT = "SO" + _MOVE_STAGE_IN = "SI" + _START_CYCLE = "GO" + _STOP_CYCLE = "AC" + _APPLY_SEAL = "AS" + _CLEAR_ERROR = "CL" + _CHECK_CYCLE_COMPLETE = "CC" + + def __init__(self, driver: PlateLocDriver): + self._driver = driver + self._target_temperature: Optional[float] = None + self._sealing_time: Optional[float] = None + self._stage_position: Optional[str] = None + + @property + def driver(self) -> PlateLocDriver: + return self._driver + + def _parse_response(self, command_code: str, response: str) -> re.Match[str]: + match = _ACK_RE.match(response) + if match is None: + raise PlateLocError(f"PlateLoc returned invalid response to {command_code!r}: {response!r}") + code = match.group("code") + if code != command_code: + raise PlateLocError(f"PlateLoc replied with {code!r} to {command_code!r}: {response!r}") + return match + + def _raise_for_error(self, command_code: str, response: str) -> None: + match = self._parse_response(command_code, response) + if match.group("status") == "N": + message = match.group("message") or "command rejected" + raise PlateLocError(f"PlateLoc rejected {command_code!r}: {message}") + + async def _send( + self, + command: str, + *, + timeout: Optional[float] = None, + raise_on_nak: bool = True, + ) -> str: + response = await self._driver.send_command( + command, + timeout=timeout if timeout is not None else self._driver.profile.ack_timeout, + required=True, + ) + assert response is not None + if raise_on_nak: + self._raise_for_error(command[:2], response) + return response + + async def set_sealing_temperature(self, temperature: float): + if not (20 <= temperature <= 235): + raise ValueError("Temperature out of range. Please enter a value between 20 and 235 C.") + target_temperature = round(temperature) + logger.info( + "[PlateLoc %s] setting sealing temperature to %.1f C", + self._driver.port, + temperature, + ) + response = await self._send(f"{self._SET_TEMPERATURE} 0.{target_temperature:03d}") + self._target_temperature = float(target_temperature) + return response + + async def set_sealing_time(self, duration: float): + if not (0.5 <= duration <= 12.0): + raise ValueError("Duration out of range. Please enter a value between 0.5 and 12.0 s.") + sealing_time_deciseconds = round(duration * 10) + logger.info("[PlateLoc %s] setting sealing time to %.2f s", self._driver.port, duration) + response = await self._send(f"{self._SET_TIME} 0.{sealing_time_deciseconds:02d}") + self._sealing_time = sealing_time_deciseconds / 10 + return response + + async def move_stage_out(self): + logger.info("[PlateLoc %s] moving stage out", self._driver.port) + response = await self._send(f"{self._MOVE_STAGE_OUT} 00") + if self._driver.profile.stage_move_delay > 0: + await asyncio.sleep(self._driver.profile.stage_move_delay) + self._stage_position = "open" + return response + + async def move_stage_in(self): + logger.info("[PlateLoc %s] moving stage in", self._driver.port) + response = await self._send(f"{self._MOVE_STAGE_IN} 00") + if self._driver.profile.stage_move_delay > 0: + await asyncio.sleep(self._driver.profile.stage_move_delay) + self._stage_position = "closed" + return response + + async def start_cycle(self): + logger.info("[PlateLoc %s] starting sealing cycle", self._driver.port) + return await self._send(f"{self._START_CYCLE} 00") + + async def stop_cycle(self): + logger.info("[PlateLoc %s] stopping sealing cycle", self._driver.port) + return await self._send(f"{self._STOP_CYCLE} 00") + + async def apply_seal(self): + logger.info("[PlateLoc %s] applying seal", self._driver.port) + return await self._send(f"{self._APPLY_SEAL} 00") + + async def clear_error(self): + logger.info("[PlateLoc %s] clearing error", self._driver.port) + return await self._send(f"{self._CLEAR_ERROR} 00") + + async def check_cycle_complete(self) -> bool: + response = await self._send( + f"{self._CHECK_CYCLE_COMPLETE} 00", + timeout=self._driver.profile.response_timeout, + raise_on_nak=False, + ) + match = self._parse_response(self._CHECK_CYCLE_COMPLETE, response) + return match.group("status") == "A" + + async def wait_for_cycle_complete(self, timeout: Optional[float] = None) -> bool: + deadline = time.time() + (self._driver.timeout if timeout is None else timeout) + while True: + if await self.check_cycle_complete(): + return True + remaining = deadline - time.time() + if remaining <= 0: + raise TimeoutError("Timeout while waiting for PlateLoc cycle to complete") + await asyncio.sleep(min(max(self._driver.profile.cycle_poll_interval, 0), remaining)) + + def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: + return PlateLocStatus( + port=self._driver.port, + connected=self._driver.connected, + target_temperature=self._target_temperature, + sealing_time=self._sealing_time, + stage_position=self._stage_position, + cycle_complete=cycle_complete, + last_command=self._driver.last_command, + last_response=self._driver.last_response, + ) + + async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: + cycle_complete = await self.check_cycle_complete() if query_cycle_complete else None + return self.status_snapshot(cycle_complete=cycle_complete) + + async def seal(self, temperature: int, duration: float): + await self.set_sealing_temperature(temperature) + await self.set_sealing_time(duration) + response = await self.start_cycle() + await self.wait_for_cycle_complete() + return response + + async def open(self): + return await self.move_stage_out() + + async def close(self): + return await self.move_stage_in() + + +class PlateLocSealer(Sealer): + """PlateLoc-specific sealing capability.""" + + def __init__(self, backend: PlateLocSealerBackend): + super().__init__(backend=backend) + self.backend: PlateLocSealerBackend = backend + + @need_capability_ready + async def set_sealing_temperature(self, temperature: float): + return await self.backend.set_sealing_temperature(temperature) + + @need_capability_ready + async def set_sealing_time(self, duration: float): + return await self.backend.set_sealing_time(duration) + + @need_capability_ready + async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: + return await self.backend.request_status(query_cycle_complete=query_cycle_complete) + + def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: + return self.backend.status_snapshot(cycle_complete=cycle_complete) + + +class PlateLoc(Device): + """Agilent PlateLoc thermal microplate sealer.""" + + def __init__( + self, + name: str, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + profile: Optional[PlateLocSerialProfile | dict] = None, + timeout: float = 30, + ): + self.name = name + driver = PlateLocDriver( + port=port, + vid=vid, + pid=pid, + profile=profile, + timeout=timeout, + ) + super().__init__(driver=driver) + self.driver: PlateLocDriver = driver + self.sealer: PlateLocSealer = PlateLocSealer(backend=PlateLocSealerBackend(driver)) + self._capabilities = [self.sealer] + + def serialize(self) -> dict: + return { + **super().serialize(), + "name": self.name, + "status": dataclasses.asdict(self.sealer.status_snapshot()), + } diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py new file mode 100644 index 00000000000..fc8c6b84999 --- /dev/null +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -0,0 +1,312 @@ +import asyncio +import contextlib +import unittest +from collections import deque +from typing import Deque +from unittest.mock import patch + +import pylabrobot.agilent.plateloc.plateloc as plateloc_module +from pylabrobot.agilent.plateloc import ( + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSealer, + PlateLocSealerBackend, + PlateLocSerialProfile, + PlateLocStatus, +) + + +class FakeSerial: + def __init__(self, **kwargs): + self.kwargs = kwargs + self._port = kwargs["port"] + self.writes = [] + self.responses: Deque[bytes] = deque() + self.setup_called = False + self.stop_called = False + self.timeout = kwargs["timeout"] + self.reset_input_buffer_called = False + + @property + def port(self): + return self._port + + @contextlib.contextmanager + def temporary_timeout(self, timeout: float): + previous_timeout = self.timeout + self.timeout = timeout + try: + yield + finally: + self.timeout = previous_timeout + + async def setup(self): + self.setup_called = True + + async def stop(self): + self.stop_called = True + + async def write(self, data: bytes): + self.writes.append(data) + + async def read(self, num_bytes: int = 1) -> bytes: + if not self.responses: + await asyncio.sleep(0) + return b"" + response = self.responses[0] + chunk = response[:num_bytes] + response = response[num_bytes:] + if response: + self.responses[0] = response + else: + self.responses.popleft() + return chunk + + def queue_response(self, response: bytes): + self.responses.append(response) + + async def reset_input_buffer(self): + self.reset_input_buffer_called = True + + +class PlateLocTests(unittest.IsolatedAsyncioTestCase): + @contextlib.contextmanager + def patch_serial(self): + with ( + patch.object(plateloc_module, "HAS_SERIAL", True), + patch.object(plateloc_module, "Serial", FakeSerial), + ): + yield + + def make_driver(self, ack_timeout=0.01, timeout=30): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=ack_timeout, + read_delay=0, + stage_move_delay=0, + cycle_poll_interval=0, + ) + with self.patch_serial(): + return PlateLocDriver(port="COM6", profile=profile, timeout=timeout) + + def make_device(self, timeout=30): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=0.01, + read_delay=0, + stage_move_delay=0, + cycle_poll_interval=0, + ) + with self.patch_serial(): + return PlateLoc(name="plateloc", port="COM6", profile=profile, timeout=timeout) + + def backend(self, device: PlateLoc) -> PlateLocSealerBackend: + return device.sealer.backend + + async def test_setup_uses_plr_serial_wrapper_settings(self): + driver = self.make_driver() + + await driver.setup() + + self.assertTrue(driver.io.setup_called) + self.assertEqual(driver.io.kwargs["human_readable_device_name"], "Agilent PlateLoc Sealer") + self.assertEqual(driver.io.kwargs["port"], "COM6") + self.assertEqual(driver.io.kwargs["baudrate"], 19200) + self.assertEqual(driver.io.kwargs["bytesize"], 8) + self.assertEqual(driver.io.kwargs["parity"], "N") + self.assertEqual(driver.io.kwargs["stopbits"], 1) + + await driver.stop() + self.assertTrue(driver.io.stop_called) + + async def test_driver_sends_literal_serial_frame(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"STAK\r") + + response = await driver.send_command("ST 0.030", timeout=0.01) + + self.assertEqual(response, "STAK") + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + self.assertTrue(driver.io.reset_input_buffer_called) + self.assertEqual(driver.last_command, "ST 0.030") + self.assertEqual(driver.last_response, "STAK") + + async def test_temperature_and_time_writes_are_scaled_and_validated(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"STAK\r") + driver.io.queue_response(b"SSAK\r") + + await backend.set_sealing_temperature(30) + await backend.set_sealing_time(0.5) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r"]) + + with self.assertRaises(ValueError): + await backend.set_sealing_temperature(19) + with self.assertRaises(ValueError): + await backend.set_sealing_time(0.4) + + async def test_negative_acknowledgement_raises_protocol_error(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"STNK(Desired Temperature is Out of Range)\r\r") + + with self.assertRaisesRegex(PlateLocError, "Desired Temperature is Out of Range"): + await backend.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + + async def test_missing_acknowledgement_raises_timeout(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + + with self.assertRaisesRegex(TimeoutError, "Timeout"): + await backend.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + + async def test_malformed_acknowledgement_raises_protocol_error(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"unexpected\r") + + with self.assertRaisesRegex(PlateLocError, "invalid response"): + await backend.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + + async def test_required_response_reads_until_plate_loc_ack(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"CCAK\r") + + self.assertTrue(await backend.check_cycle_complete()) + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + + async def test_cycle_not_complete_returns_false(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"CCNK\r") + + self.assertFalse(await backend.check_cycle_complete()) + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + + async def test_invalid_cycle_complete_response_raises_protocol_error(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"unexpected\r") + + with self.assertRaisesRegex(PlateLocError, "invalid response"): + await backend.check_cycle_complete() + + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + + async def test_status_snapshot_tracks_setpoints_and_live_cycle_complete(self): + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) + await driver.setup() + driver.io.queue_response(b"STAK\r") + driver.io.queue_response(b"SSAK\r") + driver.io.queue_response(b"SOAK\r") + + await backend.set_sealing_temperature(30) + await backend.set_sealing_time(0.5) + await backend.move_stage_out() + driver.io.queue_response(b"CCAK\r") + + status = await backend.request_status() + + self.assertIsInstance(status, PlateLocStatus) + self.assertEqual(status.port, "COM6") + self.assertTrue(status.connected) + self.assertEqual(status.target_temperature, 30) + self.assertEqual(status.sealing_time, 0.5) + self.assertEqual(status.stage_position, "open") + self.assertTrue(status.cycle_complete) + self.assertEqual(status.last_command, "CC 00") + self.assertEqual(status.last_response, "CCAK") + self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r", b"SO 00\r", b"CC 00\r"]) + + async def test_seal_waits_for_cycle_completion(self): + device = self.make_device(timeout=1) + backend = self.backend(device) + + await device.setup() + device.driver.io.queue_response(b"STAK\r") + device.driver.io.queue_response(b"SSAK\r") + device.driver.io.queue_response(b"GOAK\r") + device.driver.io.queue_response(b"CCNK\r") + device.driver.io.queue_response(b"CCAK\r") + + await device.sealer.seal(120, 1.2) + + self.assertEqual( + device.driver.io.writes, + [ + b"ST 0.120\r", + b"SS 0.12\r", + b"GO 00\r", + b"CC 00\r", + b"CC 00\r", + ], + ) + self.assertEqual(backend.status_snapshot().target_temperature, 120) + self.assertEqual(backend.status_snapshot().sealing_time, 1.2) + + async def test_device_exposes_plate_loc_sealer_capability(self): + device = self.make_device() + + await device.setup() + device.driver.io.queue_response(b"STAK\r") + await device.sealer.set_sealing_temperature(100) + device.driver.io.queue_response(b"SSAK\r") + await device.sealer.set_sealing_time(0.5) + device.driver.io.queue_response(b"STAK\r") + device.driver.io.queue_response(b"SSAK\r") + device.driver.io.queue_response(b"GOAK\r") + device.driver.io.queue_response(b"CCAK\r") + await device.sealer.seal(120, 1.2) + device.driver.io.queue_response(b"SOAK\r") + await device.sealer.open() + device.driver.io.queue_response(b"SIAK\r") + await device.sealer.close() + device.driver.io.queue_response(b"CCAK\r") + status = await device.sealer.request_status() + await device.stop() + + self.assertIsInstance(device.sealer, PlateLocSealer) + self.assertFalse(hasattr(device, "set_sealing_temperature")) + self.assertFalse(hasattr(device, "set_sealing_time")) + self.assertEqual( + device.driver.io.writes, + [ + b"ST 0.100\r", + b"SS 0.05\r", + b"ST 0.120\r", + b"SS 0.12\r", + b"GO 00\r", + b"CC 00\r", + b"SO 00\r", + b"SI 00\r", + b"CC 00\r", + ], + ) + self.assertEqual(status.target_temperature, 120) + self.assertEqual(status.sealing_time, 1.2) + self.assertEqual(status.stage_position, "closed") + self.assertTrue(status.cycle_complete) + self.assertTrue(device.driver.io.stop_called) + + +if __name__ == "__main__": + unittest.main()