From e712d69f0e44abbd80c40d3dfb342f28dce3a69a Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Fri, 8 May 2026 18:17:20 -0700 Subject: [PATCH 1/7] Add direct serial Agilent PlateLoc driver --- .../01_material-handling/sealers/sealers.md | 4 + docs/user_guide/agilent/index.md | 1 + .../agilent/plateloc/hello-world.md | 85 +++++ pylabrobot/agilent/__init__.py | 7 + pylabrobot/agilent/plateloc/__init__.py | 8 + pylabrobot/agilent/plateloc/plateloc.py | 327 ++++++++++++++++++ pylabrobot/agilent/plateloc/plateloc_tests.py | 172 +++++++++ 7 files changed, 604 insertions(+) create mode 100644 docs/user_guide/agilent/plateloc/hello-world.md create mode 100644 pylabrobot/agilent/plateloc/__init__.py create mode 100644 pylabrobot/agilent/plateloc/plateloc.py create mode 100644 pylabrobot/agilent/plateloc/plateloc_tests.py diff --git a/docs/user_guide/01_material-handling/sealers/sealers.md b/docs/user_guide/01_material-handling/sealers/sealers.md index f3a8a68a930..4e89a7be8ee 100644 --- a/docs/user_guide/01_material-handling/sealers/sealers.md +++ b/docs/user_guide/01_material-handling/sealers/sealers.md @@ -11,6 +11,10 @@ They prevent **evaporation**, **cross-contamination**, and **spillage**, especia PyLabRobot supports integration with various sealer machines, allowing you to programmatically seal plates as part of your automation workflows. +Supported serial sealers include: + +- Agilent PlateLoc through `pylabrobot.agilent.PlateLoc` + --- ## Types of Sealers diff --git a/docs/user_guide/agilent/index.md b/docs/user_guide/agilent/index.md index 1b5d31e58fb..affd0fb4803 100644 --- a/docs/user_guide/agilent/index.md +++ b/docs/user_guide/agilent/index.md @@ -4,5 +4,6 @@ :maxdepth: 1 biotek/index +plateloc/hello-world vspin/hello-world ``` diff --git a/docs/user_guide/agilent/plateloc/hello-world.md b/docs/user_guide/agilent/plateloc/hello-world.md new file mode 100644 index 00000000000..3784e8bb7d1 --- /dev/null +++ b/docs/user_guide/agilent/plateloc/hello-world.md @@ -0,0 +1,85 @@ +# Agilent PlateLoc + +The Agilent PlateLoc is controlled through PLR's `Sealer` capability with a direct RS-232 serial +driver. It does not require Agilent ActiveX, VWorks, or vendor server software. Install the +optional serial dependency before connecting: + +```bash +pip install "pylabrobot[serial]" +``` + +```python +from pylabrobot.agilent import PlateLoc + +plateloc = PlateLoc(name="plateloc", port="COM6") + +await plateloc.setup() +await plateloc.driver.set_sealing_temperature(175) +await plateloc.driver.set_sealing_time(0.5) +await plateloc.stop() +``` + +The device also exposes the standard sealer capability: + +```python +await plateloc.sealer.seal(temperature=175, duration=1.5) +await plateloc.sealer.open() +await plateloc.sealer.close() +``` + +`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. +`sealer.seal()` starts a sealing cycle after writing the requested temperature and time. + +## Serial command profile + +The decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with +two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional +setpoint convention: the digits after the decimal point are the integer controller value. + +| Operation | Frame | +|---|---| +| Set sealing temperature | `ST 0.{temperature_celsius:03d}\r` | +| Set sealing time | `SS 0.{seconds_x10:02d}\r` | +| Start cycle | `GO 00\r` | +| Stop cycle | `AC 00\r` | +| Move stage out | `SO 00\r` | +| Move stage in | `SI 00\r` | +| Apply seal | `AS 00\r` | +| Clear error | `CL 00\r` | +| Check cycle complete | `CC 00\r` | + +For example, `set_sealing_temperature(175)` writes `ST 0.175\r`, `set_sealing_temperature(30)` +writes `ST 0.030\r`, `set_sealing_time(0.5)` writes `SS 0.05\r`, and `set_sealing_time(1.2)` +writes `SS 0.12\r`. + +Negative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some +valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\r`. The +cycle-complete command returns `True` for `CCAK\r` and `False` for `CCNK\r`. + +You can still override command codes or serial settings with `PlateLocSerialProfile` while keeping +the same PLR frontend: + +```python +from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile + +profile = PlateLocSerialProfile( + baudrate=19200, + stage_move_delay=6, + commands={ + "set_sealing_temperature": "ST", + "set_sealing_time": "SS", + "start_cycle": "GO", + "move_stage_out": "SO", + "move_stage_in": "SI", + }, +) + +plateloc = PlateLoc(name="plateloc", port="COM6", profile=profile) +``` + +## Troubleshooting + +The PlateLoc RS-232 connector is not VGA and is not USB TTL. Use a USB-to-RS-232 adapter plus the +correct DB9 cable for the instrument. If the port opens but every command times out, verify the +PlateLoc is powered, the rear serial cable is seated, and the cable wiring matches the instrument +requirement. Some setups require a null-modem DB9 adapter rather than a straight-through cable. diff --git a/pylabrobot/agilent/__init__.py b/pylabrobot/agilent/__init__.py index 47564b66b6b..b401a49280b 100644 --- a/pylabrobot/agilent/__init__.py +++ b/pylabrobot/agilent/__init__.py @@ -12,4 +12,11 @@ SynergyH1, SynergyH1Backend, ) +from .plateloc import ( + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSealerBackend, + PlateLocSerialProfile, +) from .vspin import Access2, Access2Driver, VSpin, VSpinCentrifugeBackend, VSpinDriver diff --git a/pylabrobot/agilent/plateloc/__init__.py b/pylabrobot/agilent/plateloc/__init__.py new file mode 100644 index 00000000000..cef9dbbd61e --- /dev/null +++ b/pylabrobot/agilent/plateloc/__init__.py @@ -0,0 +1,8 @@ +from .plateloc import ( + DEFAULT_PLATELOC_COMMANDS, + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSealerBackend, + PlateLocSerialProfile, +) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py new file mode 100644 index 00000000000..ae6aa8c8970 --- /dev/null +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -0,0 +1,327 @@ +from __future__ import annotations + +import asyncio +import contextlib +import dataclasses +import logging +import re +import time +from typing import Mapping, Optional + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.sealing import Sealer, SealerBackend +from pylabrobot.device import Device, Driver +from pylabrobot.io.serial import Serial + +try: + import serial as _serial # noqa: F401 + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +logger = logging.getLogger(__name__) + + +DEFAULT_PLATELOC_COMMANDS: Mapping[str, str] = { + "set_sealing_temperature": "ST", + "set_sealing_time": "SS", + "move_stage_out": "SO", + "move_stage_in": "SI", + "start_cycle": "GO", + "stop_cycle": "AC", + "apply_seal": "AS", + "clear_error": "CL", + "check_cycle_complete": "CC", +} + +_ACK_RE = re.compile(r"^\s*(?PAC|AS|CC|CL|GO|SI|SO|SS|ST)(?P[AN])K(?:\((?P.*)\))?\s*$") + + +class PlateLocError(RuntimeError): + """Raised when PlateLoc communication or protocol handling fails.""" + + +@dataclasses.dataclass(frozen=True) +class PlateLocSerialProfile: + """Serial settings and command codes for a PlateLoc controller. + + The decoded low-level protocol uses two-letter command codes followed by a payload and a + carriage return. Setpoint payloads are encoded as a decimal fraction whose fractional digits + hold the integer setpoint, for example ``ST 0.175`` for 175 C and ``SS 0.12`` for 1.2 s. + """ + + baudrate: int = 19200 + bytesize: int = 8 + parity: str = "N" + stopbits: int = 1 + timeout: float = 1 + write_timeout: float = 1 + rtscts: bool = False + dsrdtr: bool = False + xonxoff: bool = False + read_delay: float = 0.05 + ack_timeout: float = 10 + response_timeout: float = 2 + stage_move_delay: float = 6 + command_terminator: str = "\r" + response_terminator: bytes = b"\r" + commands: Mapping[str, str] = dataclasses.field( + default_factory=lambda: dict(DEFAULT_PLATELOC_COMMANDS) + ) + + def format_command(self, command: str, payload: str = "00") -> bytes: + code = self.commands.get(command) + if code is None: + raise PlateLocError(f"No PlateLoc serial command configured for {command!r}.") + return f"{code} {payload}{self.command_terminator}".encode("ascii") + + def serialize(self) -> dict: + return { + "baudrate": self.baudrate, + "bytesize": self.bytesize, + "parity": self.parity, + "stopbits": self.stopbits, + "timeout": self.timeout, + "write_timeout": self.write_timeout, + "rtscts": self.rtscts, + "dsrdtr": self.dsrdtr, + "xonxoff": self.xonxoff, + "read_delay": self.read_delay, + "ack_timeout": self.ack_timeout, + "response_timeout": self.response_timeout, + "stage_move_delay": self.stage_move_delay, + "command_terminator": self.command_terminator, + "response_terminator": self.response_terminator.decode("latin1"), + "commands": dict(self.commands), + } + + @classmethod + def deserialize(cls, data: dict) -> "PlateLocSerialProfile": + data = data.copy() + if "response_terminator" in data: + data["response_terminator"] = data["response_terminator"].encode("latin1") + return cls(**data) + + +class PlateLocDriver(Driver): + """Direct serial driver for the Agilent PlateLoc thermal microplate sealer.""" + + def __init__( + self, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + profile: Optional[PlateLocSerialProfile | dict] = None, + timeout: float = 30, + serial_cls=Serial, + ) -> None: + super().__init__() + if serial_cls is Serial and not HAS_SERIAL: + raise RuntimeError( + "pyserial is not installed. Install with: pip install pylabrobot[serial]. " + f"Import error: {_SERIAL_IMPORT_ERROR}" + ) + if isinstance(profile, dict): + profile = PlateLocSerialProfile.deserialize(profile) + self.profile = profile or PlateLocSerialProfile() + self.timeout = timeout + self.io = serial_cls( + human_readable_device_name="Agilent PlateLoc Sealer", + port=port, + vid=vid, + pid=pid, + baudrate=self.profile.baudrate, + bytesize=self.profile.bytesize, + parity=self.profile.parity, + stopbits=self.profile.stopbits, + write_timeout=self.profile.write_timeout, + timeout=self.profile.timeout, + rtscts=self.profile.rtscts, + dsrdtr=self.profile.dsrdtr, + xonxoff=self.profile.xonxoff, + ) + + @property + def port(self) -> str: + return self.io.port + + async def setup(self, backend_params: Optional[BackendParams] = None): + await self.io.setup() + logger.info("[PlateLoc %s] connected", self.port) + + async def stop(self): + await self.io.stop() + logger.info("[PlateLoc %s] disconnected", self.port) + + @contextlib.contextmanager + def _read_timeout(self, timeout: float): + if hasattr(self.io, "temporary_timeout"): + with self.io.temporary_timeout(timeout): + yield + else: + yield + + async def send_command( + self, + command: str, + payload: str = "00", + expect_response: bool = False, + raise_on_nak: bool = True, + ) -> Optional[str]: + data = self.profile.format_command(command, payload=payload) + if hasattr(self.io, "reset_input_buffer"): + await self.io.reset_input_buffer() + await self.io.write(data) + if self.profile.read_delay > 0: + await asyncio.sleep(self.profile.read_delay) + + response = await self.read_response( + timeout=self.profile.response_timeout if expect_response else self.profile.ack_timeout, + required=expect_response, + ) + if response is not None and raise_on_nak: + self._raise_for_error(command, response) + return response + + async def read_response(self, timeout: Optional[float] = None, required: bool = True) -> Optional[str]: + deadline = time.time() + (timeout if timeout is not None else self.profile.response_timeout) + chunks = bytearray() + while time.time() < deadline: + with self._read_timeout(max(0.01, min(0.1, deadline - time.time()))): + chunk = await self.io.read(1) + if chunk: + chunks.extend(chunk) + if chunks.endswith(self.profile.response_terminator): + break + elif len(chunks) > 0: + break + + if len(chunks) == 0: + if required: + raise TimeoutError("Timeout while waiting for PlateLoc response") + return None + return bytes(chunks).decode("utf-8", errors="replace").strip() + + def _raise_for_error(self, command: str, response: str): + match = _ACK_RE.match(response) + if match is None: + return + code = match.group("code") + expected_code = self.profile.commands.get(command) + if expected_code is not None and code != expected_code: + raise PlateLocError(f"PlateLoc replied with {code!r} to {command!r}: {response!r}") + if match.group("status") == "N": + message = match.group("message") or "command rejected" + raise PlateLocError(f"PlateLoc rejected {command!r}: {message}") + + async def set_sealing_temperature(self, temperature: float): + if not (20 <= temperature <= 235): + raise ValueError("Temperature out of range. Please enter a value between 20 and 235 C.") + payload = f"0.{round(temperature):03d}" + logger.info("[PlateLoc %s] setting sealing temperature to %.1f C", self.port, temperature) + return await self.send_command("set_sealing_temperature", payload=payload) + + async def set_sealing_time(self, duration: float): + if not (0.5 <= duration <= 12.0): + raise ValueError("Duration out of range. Please enter a value between 0.5 and 12.0 s.") + payload = f"0.{round(duration * 10):02d}" + logger.info("[PlateLoc %s] setting sealing time to %.2f s", self.port, duration) + return await self.send_command("set_sealing_time", payload=payload) + + async def move_stage_out(self): + logger.info("[PlateLoc %s] moving stage out", self.port) + response = await self.send_command("move_stage_out") + if self.profile.stage_move_delay > 0: + await asyncio.sleep(self.profile.stage_move_delay) + return response + + async def move_stage_in(self): + logger.info("[PlateLoc %s] moving stage in", self.port) + response = await self.send_command("move_stage_in") + if self.profile.stage_move_delay > 0: + await asyncio.sleep(self.profile.stage_move_delay) + return response + + async def start_cycle(self): + logger.info("[PlateLoc %s] starting sealing cycle", self.port) + return await self.send_command("start_cycle") + + async def stop_cycle(self): + logger.info("[PlateLoc %s] stopping sealing cycle", self.port) + return await self.send_command("stop_cycle") + + async def apply_seal(self): + logger.info("[PlateLoc %s] applying seal", self.port) + return await self.send_command("apply_seal") + + async def clear_error(self): + logger.info("[PlateLoc %s] clearing error", self.port) + return await self.send_command("clear_error") + + async def check_cycle_complete(self) -> bool: + response = await self.send_command( + "check_cycle_complete", + expect_response=True, + raise_on_nak=False, + ) + match = _ACK_RE.match(response or "") + return match is not None and match.group("status") == "A" + + def serialize(self) -> dict: + return { + **super().serialize(), + "port": self.port, + "profile": self.profile.serialize(), + "timeout": self.timeout, + } + + +class PlateLocSealerBackend(SealerBackend): + """Translates SealerBackend operations into direct PlateLoc serial commands.""" + + def __init__(self, driver: PlateLocDriver): + self.driver = driver + + async def seal(self, temperature: int, duration: float): + await self.driver.set_sealing_temperature(temperature) + await self.driver.set_sealing_time(duration) + return await self.driver.start_cycle() + + async def open(self): + return await self.driver.move_stage_out() + + async def close(self): + return await self.driver.move_stage_in() + + +class PlateLoc(Device): + """Agilent PlateLoc thermal microplate sealer.""" + + def __init__( + self, + name: str, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + profile: Optional[PlateLocSerialProfile | dict] = None, + timeout: float = 30, + serial_cls=Serial, + ): + self.name = name + driver = PlateLocDriver( + port=port, + vid=vid, + pid=pid, + profile=profile, + timeout=timeout, + serial_cls=serial_cls, + ) + super().__init__(driver=driver) + self.driver: PlateLocDriver = driver + self.sealer = Sealer(backend=PlateLocSealerBackend(driver)) + self._capabilities = [self.sealer] + + def serialize(self) -> dict: + return {**super().serialize(), "name": self.name} diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py new file mode 100644 index 00000000000..898982503a1 --- /dev/null +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -0,0 +1,172 @@ +import asyncio +import contextlib +import unittest +from collections import deque + +from pylabrobot.agilent.plateloc import ( + DEFAULT_PLATELOC_COMMANDS, + PlateLoc, + PlateLocDriver, + PlateLocError, + PlateLocSerialProfile, +) + + +class FakeSerial: + def __init__(self, **kwargs): + self.kwargs = kwargs + self._port = kwargs["port"] + self.writes = [] + self.responses = deque() + self.setup_called = False + self.stop_called = False + self.timeout = kwargs["timeout"] + self.reset_input_buffer_called = False + + @property + def port(self): + return self._port + + @contextlib.contextmanager + def temporary_timeout(self, timeout: float): + previous_timeout = self.timeout + self.timeout = timeout + try: + yield + finally: + self.timeout = previous_timeout + + async def setup(self): + self.setup_called = True + + async def stop(self): + self.stop_called = True + + async def write(self, data: bytes): + self.writes.append(data) + + async def read(self, num_bytes: int = 1) -> bytes: + if not self.responses: + await asyncio.sleep(0) + return b"" + response = self.responses[0] + chunk = response[:num_bytes] + response = response[num_bytes:] + if response: + self.responses[0] = response + else: + self.responses.popleft() + return chunk + + def queue_response(self, response: bytes): + self.responses.append(response) + + async def reset_input_buffer(self): + self.reset_input_buffer_called = True + + +class PlateLocTests(unittest.IsolatedAsyncioTestCase): + def make_driver(self, commands=None, ack_timeout=0): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=ack_timeout, + read_delay=0, + stage_move_delay=0, + commands=commands or DEFAULT_PLATELOC_COMMANDS, + ) + return PlateLocDriver(port="COM6", profile=profile, serial_cls=FakeSerial) + + async def test_setup_uses_plr_serial_wrapper_settings(self): + driver = self.make_driver() + + await driver.setup() + + self.assertTrue(driver.io.setup_called) + self.assertEqual(driver.io.kwargs["human_readable_device_name"], "Agilent PlateLoc Sealer") + self.assertEqual(driver.io.kwargs["port"], "COM6") + self.assertEqual(driver.io.kwargs["baudrate"], 19200) + self.assertEqual(driver.io.kwargs["bytesize"], 8) + self.assertEqual(driver.io.kwargs["parity"], "N") + self.assertEqual(driver.io.kwargs["stopbits"], 1) + + await driver.stop() + self.assertTrue(driver.io.stop_called) + + async def test_temperature_and_time_writes_are_scaled_and_validated(self): + driver = self.make_driver() + await driver.setup() + + await driver.set_sealing_temperature(30) + await driver.set_sealing_time(0.5) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r"]) + + with self.assertRaises(ValueError): + await driver.set_sealing_temperature(19) + with self.assertRaises(ValueError): + await driver.set_sealing_time(0.4) + + async def test_negative_acknowledgement_raises_protocol_error(self): + driver = self.make_driver(ack_timeout=0.01) + await driver.setup() + driver.io.queue_response(b"STNK(Desired Temperature is Out of Range)\r\r") + + with self.assertRaisesRegex(PlateLocError, "Desired Temperature is Out of Range"): + await driver.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + + async def test_required_response_reads_until_plate_loc_ack(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"CCAK\r") + + self.assertTrue(await driver.check_cycle_complete()) + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + + async def test_cycle_not_complete_returns_false(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"CCNK\r") + + self.assertFalse(await driver.check_cycle_complete()) + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + + async def test_custom_command_profile(self): + driver = self.make_driver( + commands={ + "set_sealing_temperature": "TP", + "set_sealing_time": "TM", + } + ) + await driver.setup() + + await driver.set_sealing_temperature(120) + await driver.set_sealing_time(1.25) + + self.assertEqual(driver.io.writes, [b"TP 0.120\r", b"TM 0.12\r"]) + + async def test_device_exposes_sealer_capability(self): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=0, + read_delay=0, + stage_move_delay=0, + ) + device = PlateLoc(name="plateloc", port="COM6", profile=profile, serial_cls=FakeSerial) + + await device.setup() + await device.sealer.seal(120, 1.2) + await device.sealer.open() + await device.sealer.close() + await device.stop() + + self.assertEqual( + device.driver.io.writes, + [b"ST 0.120\r", b"SS 0.12\r", b"GO 00\r", b"SO 00\r", b"SI 00\r"], + ) + self.assertTrue(device.driver.io.stop_called) + + +if __name__ == "__main__": + unittest.main() From 26b7b10e66c3462e7fcbad678ee58aebd6200282 Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Fri, 8 May 2026 18:47:46 -0700 Subject: [PATCH 2/7] Expose PlateLoc setpoints and status --- .../agilent/plateloc/hello-world.md | 19 ++++- pylabrobot/agilent/__init__.py | 1 + pylabrobot/agilent/plateloc/__init__.py | 1 + pylabrobot/agilent/plateloc/plateloc.py | 74 +++++++++++++++++-- pylabrobot/agilent/plateloc/plateloc_tests.py | 39 +++++++++- 5 files changed, 126 insertions(+), 8 deletions(-) diff --git a/docs/user_guide/agilent/plateloc/hello-world.md b/docs/user_guide/agilent/plateloc/hello-world.md index 3784e8bb7d1..247c9b18856 100644 --- a/docs/user_guide/agilent/plateloc/hello-world.md +++ b/docs/user_guide/agilent/plateloc/hello-world.md @@ -14,8 +14,9 @@ from pylabrobot.agilent import PlateLoc plateloc = PlateLoc(name="plateloc", port="COM6") await plateloc.setup() -await plateloc.driver.set_sealing_temperature(175) -await plateloc.driver.set_sealing_time(0.5) +await plateloc.set_sealing_temperature(175) +await plateloc.set_sealing_time(0.5) +status = await plateloc.request_status() await plateloc.stop() ``` @@ -30,6 +31,20 @@ await plateloc.sealer.close() `sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time. +The PlateLoc-specific frontend also exposes independent setpoint and status helpers: + +```python +await plateloc.set_sealing_temperature(160) +await plateloc.set_sealing_time(1.0) + +status = await plateloc.request_status() +print(status.target_temperature, status.sealing_time, status.cycle_complete) +``` + +`request_status()` returns the best-known PLR state plus a live cycle-complete query. The direct +serial protocol decoded here does not expose actual block temperature or actual stored time reads, +so PLR reports the last successfully written target temperature and sealing time. + ## Serial command profile The decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with diff --git a/pylabrobot/agilent/__init__.py b/pylabrobot/agilent/__init__.py index b401a49280b..efc270351e3 100644 --- a/pylabrobot/agilent/__init__.py +++ b/pylabrobot/agilent/__init__.py @@ -18,5 +18,6 @@ PlateLocError, PlateLocSealerBackend, PlateLocSerialProfile, + PlateLocStatus, ) from .vspin import Access2, Access2Driver, VSpin, VSpinCentrifugeBackend, VSpinDriver diff --git a/pylabrobot/agilent/plateloc/__init__.py b/pylabrobot/agilent/plateloc/__init__.py index cef9dbbd61e..28b11ca53e6 100644 --- a/pylabrobot/agilent/plateloc/__init__.py +++ b/pylabrobot/agilent/plateloc/__init__.py @@ -5,4 +5,5 @@ PlateLocError, PlateLocSealerBackend, PlateLocSerialProfile, + PlateLocStatus, ) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py index ae6aa8c8970..ece84fe0c8f 100644 --- a/pylabrobot/agilent/plateloc/plateloc.py +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -43,6 +43,20 @@ class PlateLocError(RuntimeError): """Raised when PlateLoc communication or protocol handling fails.""" +@dataclasses.dataclass(frozen=True) +class PlateLocStatus: + """Best-known PlateLoc state from direct serial control.""" + + port: str + connected: bool + target_temperature: Optional[float] + sealing_time: Optional[float] + stage_position: Optional[str] + cycle_complete: Optional[bool] + last_command: Optional[str] + last_response: Optional[str] + + @dataclasses.dataclass(frozen=True) class PlateLocSerialProfile: """Serial settings and command codes for a PlateLoc controller. @@ -127,6 +141,12 @@ def __init__( profile = PlateLocSerialProfile.deserialize(profile) self.profile = profile or PlateLocSerialProfile() self.timeout = timeout + self._connected = False + self._target_temperature: Optional[float] = None + self._sealing_time: Optional[float] = None + self._stage_position: Optional[str] = None + self._last_command: Optional[str] = None + self._last_response: Optional[str] = None self.io = serial_cls( human_readable_device_name="Agilent PlateLoc Sealer", port=port, @@ -149,10 +169,12 @@ def port(self) -> str: async def setup(self, backend_params: Optional[BackendParams] = None): await self.io.setup() + self._connected = True logger.info("[PlateLoc %s] connected", self.port) async def stop(self): await self.io.stop() + self._connected = False logger.info("[PlateLoc %s] disconnected", self.port) @contextlib.contextmanager @@ -181,6 +203,8 @@ async def send_command( timeout=self.profile.response_timeout if expect_response else self.profile.ack_timeout, required=expect_response, ) + self._last_command = command + self._last_response = response if response is not None and raise_on_nak: self._raise_for_error(command, response) return response @@ -219,22 +243,29 @@ def _raise_for_error(self, command: str, response: str): async def set_sealing_temperature(self, temperature: float): if not (20 <= temperature <= 235): raise ValueError("Temperature out of range. Please enter a value between 20 and 235 C.") - payload = f"0.{round(temperature):03d}" + target_temperature = round(temperature) + payload = f"0.{target_temperature:03d}" logger.info("[PlateLoc %s] setting sealing temperature to %.1f C", self.port, temperature) - return await self.send_command("set_sealing_temperature", payload=payload) + response = await self.send_command("set_sealing_temperature", payload=payload) + self._target_temperature = float(target_temperature) + return response async def set_sealing_time(self, duration: float): if not (0.5 <= duration <= 12.0): raise ValueError("Duration out of range. Please enter a value between 0.5 and 12.0 s.") - payload = f"0.{round(duration * 10):02d}" + sealing_time_deciseconds = round(duration * 10) + payload = f"0.{sealing_time_deciseconds:02d}" logger.info("[PlateLoc %s] setting sealing time to %.2f s", self.port, duration) - return await self.send_command("set_sealing_time", payload=payload) + response = await self.send_command("set_sealing_time", payload=payload) + self._sealing_time = sealing_time_deciseconds / 10 + return response async def move_stage_out(self): logger.info("[PlateLoc %s] moving stage out", self.port) response = await self.send_command("move_stage_out") if self.profile.stage_move_delay > 0: await asyncio.sleep(self.profile.stage_move_delay) + self._stage_position = "open" return response async def move_stage_in(self): @@ -242,6 +273,7 @@ async def move_stage_in(self): response = await self.send_command("move_stage_in") if self.profile.stage_move_delay > 0: await asyncio.sleep(self.profile.stage_move_delay) + self._stage_position = "closed" return response async def start_cycle(self): @@ -269,6 +301,22 @@ async def check_cycle_complete(self) -> bool: match = _ACK_RE.match(response or "") return match is not None and match.group("status") == "A" + def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: + return PlateLocStatus( + port=self.port, + connected=self._connected, + target_temperature=self._target_temperature, + sealing_time=self._sealing_time, + stage_position=self._stage_position, + cycle_complete=cycle_complete, + last_command=self._last_command, + last_response=self._last_response, + ) + + async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: + cycle_complete = await self.check_cycle_complete() if query_cycle_complete else None + return self.status_snapshot(cycle_complete=cycle_complete) + def serialize(self) -> dict: return { **super().serialize(), @@ -323,5 +371,21 @@ def __init__( self.sealer = Sealer(backend=PlateLocSealerBackend(driver)) self._capabilities = [self.sealer] + async def set_sealing_temperature(self, temperature: float): + return await self.driver.set_sealing_temperature(temperature) + + async def set_sealing_time(self, duration: float): + return await self.driver.set_sealing_time(duration) + + def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: + return self.driver.status_snapshot(cycle_complete=cycle_complete) + + async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: + return await self.driver.request_status(query_cycle_complete=query_cycle_complete) + def serialize(self) -> dict: - return {**super().serialize(), "name": self.name} + return { + **super().serialize(), + "name": self.name, + "status": dataclasses.asdict(self.status_snapshot()), + } diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py index 898982503a1..7ab7fa56720 100644 --- a/pylabrobot/agilent/plateloc/plateloc_tests.py +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -9,6 +9,7 @@ PlateLocDriver, PlateLocError, PlateLocSerialProfile, + PlateLocStatus, ) @@ -132,6 +133,28 @@ async def test_cycle_not_complete_returns_false(self): self.assertFalse(await driver.check_cycle_complete()) self.assertEqual(driver.io.writes, [b"CC 00\r"]) + async def test_status_snapshot_tracks_setpoints_and_live_cycle_complete(self): + driver = self.make_driver() + await driver.setup() + + await driver.set_sealing_temperature(30) + await driver.set_sealing_time(0.5) + await driver.move_stage_out() + driver.io.queue_response(b"CCAK\r") + + status = await driver.request_status() + + self.assertIsInstance(status, PlateLocStatus) + self.assertEqual(status.port, "COM6") + self.assertTrue(status.connected) + self.assertEqual(status.target_temperature, 30) + self.assertEqual(status.sealing_time, 0.5) + self.assertEqual(status.stage_position, "open") + self.assertTrue(status.cycle_complete) + self.assertEqual(status.last_command, "check_cycle_complete") + self.assertEqual(status.last_response, "CCAK") + self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r", b"SO 00\r", b"CC 00\r"]) + async def test_custom_command_profile(self): driver = self.make_driver( commands={ @@ -156,15 +179,29 @@ async def test_device_exposes_sealer_capability(self): device = PlateLoc(name="plateloc", port="COM6", profile=profile, serial_cls=FakeSerial) await device.setup() + await device.set_sealing_temperature(100) + await device.set_sealing_time(0.5) await device.sealer.seal(120, 1.2) await device.sealer.open() await device.sealer.close() + status = device.status_snapshot() await device.stop() self.assertEqual( device.driver.io.writes, - [b"ST 0.120\r", b"SS 0.12\r", b"GO 00\r", b"SO 00\r", b"SI 00\r"], + [ + b"ST 0.100\r", + b"SS 0.05\r", + b"ST 0.120\r", + b"SS 0.12\r", + b"GO 00\r", + b"SO 00\r", + b"SI 00\r", + ], ) + self.assertEqual(status.target_temperature, 120) + self.assertEqual(status.sealing_time, 1.2) + self.assertEqual(status.stage_position, "closed") self.assertTrue(device.driver.io.stop_called) From d86456616969814daf374132cbd379cc9ac88013 Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Fri, 8 May 2026 20:43:31 -0700 Subject: [PATCH 3/7] Address PlateLoc review feedback --- pylabrobot/agilent/plateloc/plateloc.py | 27 ++++++++-- pylabrobot/agilent/plateloc/plateloc_tests.py | 52 ++++++++++++++++++- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py index ece84fe0c8f..2fb7ef64191 100644 --- a/pylabrobot/agilent/plateloc/plateloc.py +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -36,7 +36,7 @@ "check_cycle_complete": "CC", } -_ACK_RE = re.compile(r"^\s*(?PAC|AS|CC|CL|GO|SI|SO|SS|ST)(?P[AN])K(?:\((?P.*)\))?\s*$") +_ACK_RE = re.compile(r"^\s*(?P[A-Z0-9]{2})(?P[AN])K(?:\((?P.*)\))?\s*$") class PlateLocError(RuntimeError): @@ -79,6 +79,7 @@ class PlateLocSerialProfile: ack_timeout: float = 10 response_timeout: float = 2 stage_move_delay: float = 6 + cycle_poll_interval: float = 0.5 command_terminator: str = "\r" response_terminator: bytes = b"\r" commands: Mapping[str, str] = dataclasses.field( @@ -106,6 +107,7 @@ def serialize(self) -> dict: "ack_timeout": self.ack_timeout, "response_timeout": self.response_timeout, "stage_move_delay": self.stage_move_delay, + "cycle_poll_interval": self.cycle_poll_interval, "command_terminator": self.command_terminator, "response_terminator": self.response_terminator.decode("latin1"), "commands": dict(self.commands), @@ -299,7 +301,24 @@ async def check_cycle_complete(self) -> bool: raise_on_nak=False, ) match = _ACK_RE.match(response or "") - return match is not None and match.group("status") == "A" + if match is None: + return False + expected_code = self.profile.commands.get("check_cycle_complete") + if expected_code is not None and match.group("code") != expected_code: + raise PlateLocError( + f"PlateLoc replied with {match.group('code')!r} to 'check_cycle_complete': {response!r}" + ) + return match.group("status") == "A" + + async def wait_for_cycle_complete(self, timeout: Optional[float] = None) -> bool: + deadline = time.time() + (self.timeout if timeout is None else timeout) + while True: + if await self.check_cycle_complete(): + return True + remaining = deadline - time.time() + if remaining <= 0: + raise TimeoutError("Timeout while waiting for PlateLoc cycle to complete") + await asyncio.sleep(min(max(self.profile.cycle_poll_interval, 0), remaining)) def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: return PlateLocStatus( @@ -335,7 +354,9 @@ def __init__(self, driver: PlateLocDriver): async def seal(self, temperature: int, duration: float): await self.driver.set_sealing_temperature(temperature) await self.driver.set_sealing_time(duration) - return await self.driver.start_cycle() + response = await self.driver.start_cycle() + await self.driver.wait_for_cycle_complete() + return response async def open(self): return await self.driver.move_stage_out() diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py index 7ab7fa56720..5df1d5b32e9 100644 --- a/pylabrobot/agilent/plateloc/plateloc_tests.py +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -67,15 +67,16 @@ async def reset_input_buffer(self): class PlateLocTests(unittest.IsolatedAsyncioTestCase): - def make_driver(self, commands=None, ack_timeout=0): + def make_driver(self, commands=None, ack_timeout=0, timeout=30): profile = PlateLocSerialProfile( response_timeout=0.01, ack_timeout=ack_timeout, read_delay=0, stage_move_delay=0, + cycle_poll_interval=0, commands=commands or DEFAULT_PLATELOC_COMMANDS, ) - return PlateLocDriver(port="COM6", profile=profile, serial_cls=FakeSerial) + return PlateLocDriver(port="COM6", profile=profile, timeout=timeout, serial_cls=FakeSerial) async def test_setup_uses_plr_serial_wrapper_settings(self): driver = self.make_driver() @@ -169,18 +170,64 @@ async def test_custom_command_profile(self): self.assertEqual(driver.io.writes, [b"TP 0.120\r", b"TM 0.12\r"]) + async def test_custom_command_acknowledgement_codes_are_parsed(self): + commands = { + **DEFAULT_PLATELOC_COMMANDS, + "set_sealing_temperature": "TP", + "check_cycle_complete": "CP", + } + driver = self.make_driver(commands=commands, ack_timeout=0.01) + await driver.setup() + driver.io.queue_response(b"TPNK(Desired Temperature is Out of Range)\r") + + with self.assertRaisesRegex(PlateLocError, "Desired Temperature is Out of Range"): + await driver.set_sealing_temperature(120) + + driver.io.queue_response(b"CPAK\r") + self.assertTrue(await driver.check_cycle_complete()) + self.assertEqual(driver.io.writes, [b"TP 0.120\r", b"CP 00\r"]) + + async def test_seal_waits_for_cycle_completion(self): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=0, + read_delay=0, + stage_move_delay=0, + cycle_poll_interval=0, + ) + device = PlateLoc(name="plateloc", port="COM6", profile=profile, timeout=1, serial_cls=FakeSerial) + + await device.setup() + device.driver.io.queue_response(b"CCNK\r") + device.driver.io.queue_response(b"CCAK\r") + + await device.sealer.seal(120, 1.2) + + self.assertEqual( + device.driver.io.writes, + [ + b"ST 0.120\r", + b"SS 0.12\r", + b"GO 00\r", + b"CC 00\r", + b"CC 00\r", + ], + ) + async def test_device_exposes_sealer_capability(self): profile = PlateLocSerialProfile( response_timeout=0.01, ack_timeout=0, read_delay=0, stage_move_delay=0, + cycle_poll_interval=0, ) device = PlateLoc(name="plateloc", port="COM6", profile=profile, serial_cls=FakeSerial) await device.setup() await device.set_sealing_temperature(100) await device.set_sealing_time(0.5) + device.driver.io.queue_response(b"CCAK\r") await device.sealer.seal(120, 1.2) await device.sealer.open() await device.sealer.close() @@ -195,6 +242,7 @@ async def test_device_exposes_sealer_capability(self): b"ST 0.120\r", b"SS 0.12\r", b"GO 00\r", + b"CC 00\r", b"SO 00\r", b"SI 00\r", ], From b73da66b8b33a8ed562b00439fc81d5a2f6f40c0 Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Fri, 8 May 2026 20:46:07 -0700 Subject: [PATCH 4/7] Fix PlateLoc typing --- pylabrobot/agilent/plateloc/plateloc.py | 4 ++-- pylabrobot/agilent/plateloc/plateloc_tests.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py index 2fb7ef64191..851ff6500be 100644 --- a/pylabrobot/agilent/plateloc/plateloc.py +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -6,7 +6,7 @@ import logging import re import time -from typing import Mapping, Optional +from typing import Mapping, Optional, cast from pylabrobot.capabilities.capability import BackendParams from pylabrobot.capabilities.sealing import Sealer, SealerBackend @@ -167,7 +167,7 @@ def __init__( @property def port(self) -> str: - return self.io.port + return cast(str, self.io.port) async def setup(self, backend_params: Optional[BackendParams] = None): await self.io.setup() diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py index 5df1d5b32e9..367e77dfa1a 100644 --- a/pylabrobot/agilent/plateloc/plateloc_tests.py +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -2,6 +2,7 @@ import contextlib import unittest from collections import deque +from typing import Deque from pylabrobot.agilent.plateloc import ( DEFAULT_PLATELOC_COMMANDS, @@ -18,7 +19,7 @@ def __init__(self, **kwargs): self.kwargs = kwargs self._port = kwargs["port"] self.writes = [] - self.responses = deque() + self.responses: Deque[bytes] = deque() self.setup_called = False self.stop_called = False self.timeout = kwargs["timeout"] From 3a6bb3d6eb7ef372e4cd69a9718e32d52796be91 Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Wed, 13 May 2026 18:44:13 -0700 Subject: [PATCH 5/7] Harden PlateLoc serial acknowledgements --- .../01_material-handling/sealers/sealers.md | 2 +- .../agilent/plateloc/hello-world.md | 2 +- pylabrobot/agilent/plateloc/plateloc.py | 11 ++-- pylabrobot/agilent/plateloc/plateloc_tests.py | 52 +++++++++++++++++-- 4 files changed, 58 insertions(+), 9 deletions(-) diff --git a/docs/user_guide/01_material-handling/sealers/sealers.md b/docs/user_guide/01_material-handling/sealers/sealers.md index 4e89a7be8ee..840c6221e0d 100644 --- a/docs/user_guide/01_material-handling/sealers/sealers.md +++ b/docs/user_guide/01_material-handling/sealers/sealers.md @@ -3,7 +3,7 @@ ## Installation ```bash -pip install pylabrobot[serial] +pip install "pylabrobot[serial]" ``` In automated wet lab workflows, **microplate sealers** are essential for preserving sample integrity. diff --git a/docs/user_guide/agilent/plateloc/hello-world.md b/docs/user_guide/agilent/plateloc/hello-world.md index 247c9b18856..b8e3cf219b2 100644 --- a/docs/user_guide/agilent/plateloc/hello-world.md +++ b/docs/user_guide/agilent/plateloc/hello-world.md @@ -31,7 +31,7 @@ await plateloc.sealer.close() `sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time. -The PlateLoc-specific frontend also exposes independent setpoint and status helpers: +The PlateLoc device class also exposes independent setpoint and status helpers: ```python await plateloc.set_sealing_temperature(160) diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py index 851ff6500be..92411d30ef8 100644 --- a/pylabrobot/agilent/plateloc/plateloc.py +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -203,11 +203,12 @@ async def send_command( response = await self.read_response( timeout=self.profile.response_timeout if expect_response else self.profile.ack_timeout, - required=expect_response, + required=True, ) + assert response is not None self._last_command = command self._last_response = response - if response is not None and raise_on_nak: + if raise_on_nak: self._raise_for_error(command, response) return response @@ -233,7 +234,7 @@ async def read_response(self, timeout: Optional[float] = None, required: bool = def _raise_for_error(self, command: str, response: str): match = _ACK_RE.match(response) if match is None: - return + raise PlateLocError(f"PlateLoc returned invalid response to {command!r}: {response!r}") code = match.group("code") expected_code = self.profile.commands.get(command) if expected_code is not None and code != expected_code: @@ -302,7 +303,9 @@ async def check_cycle_complete(self) -> bool: ) match = _ACK_RE.match(response or "") if match is None: - return False + raise PlateLocError( + f"PlateLoc returned invalid response to 'check_cycle_complete': {response!r}" + ) expected_code = self.profile.commands.get("check_cycle_complete") if expected_code is not None and match.group("code") != expected_code: raise PlateLocError( diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py index 367e77dfa1a..ae5eb51756e 100644 --- a/pylabrobot/agilent/plateloc/plateloc_tests.py +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -68,7 +68,7 @@ async def reset_input_buffer(self): class PlateLocTests(unittest.IsolatedAsyncioTestCase): - def make_driver(self, commands=None, ack_timeout=0, timeout=30): + def make_driver(self, commands=None, ack_timeout=0.01, timeout=30): profile = PlateLocSerialProfile( response_timeout=0.01, ack_timeout=ack_timeout, @@ -98,6 +98,8 @@ async def test_setup_uses_plr_serial_wrapper_settings(self): async def test_temperature_and_time_writes_are_scaled_and_validated(self): driver = self.make_driver() await driver.setup() + driver.io.queue_response(b"STAK\r") + driver.io.queue_response(b"SSAK\r") await driver.set_sealing_temperature(30) await driver.set_sealing_time(0.5) @@ -119,6 +121,25 @@ async def test_negative_acknowledgement_raises_protocol_error(self): self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + async def test_missing_acknowledgement_raises_timeout(self): + driver = self.make_driver() + await driver.setup() + + with self.assertRaisesRegex(TimeoutError, "Timeout"): + await driver.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + + async def test_malformed_acknowledgement_raises_protocol_error(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"unexpected\r") + + with self.assertRaisesRegex(PlateLocError, "invalid response"): + await driver.set_sealing_temperature(30) + + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + async def test_required_response_reads_until_plate_loc_ack(self): driver = self.make_driver() await driver.setup() @@ -135,9 +156,22 @@ async def test_cycle_not_complete_returns_false(self): self.assertFalse(await driver.check_cycle_complete()) self.assertEqual(driver.io.writes, [b"CC 00\r"]) + async def test_invalid_cycle_complete_response_raises_protocol_error(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"unexpected\r") + + with self.assertRaisesRegex(PlateLocError, "invalid response"): + await driver.check_cycle_complete() + + self.assertEqual(driver.io.writes, [b"CC 00\r"]) + async def test_status_snapshot_tracks_setpoints_and_live_cycle_complete(self): driver = self.make_driver() await driver.setup() + driver.io.queue_response(b"STAK\r") + driver.io.queue_response(b"SSAK\r") + driver.io.queue_response(b"SOAK\r") await driver.set_sealing_temperature(30) await driver.set_sealing_time(0.5) @@ -165,6 +199,8 @@ async def test_custom_command_profile(self): } ) await driver.setup() + driver.io.queue_response(b"TPAK\r") + driver.io.queue_response(b"TMAK\r") await driver.set_sealing_temperature(120) await driver.set_sealing_time(1.25) @@ -191,7 +227,7 @@ async def test_custom_command_acknowledgement_codes_are_parsed(self): async def test_seal_waits_for_cycle_completion(self): profile = PlateLocSerialProfile( response_timeout=0.01, - ack_timeout=0, + ack_timeout=0.01, read_delay=0, stage_move_delay=0, cycle_poll_interval=0, @@ -199,6 +235,9 @@ async def test_seal_waits_for_cycle_completion(self): device = PlateLoc(name="plateloc", port="COM6", profile=profile, timeout=1, serial_cls=FakeSerial) await device.setup() + device.driver.io.queue_response(b"STAK\r") + device.driver.io.queue_response(b"SSAK\r") + device.driver.io.queue_response(b"GOAK\r") device.driver.io.queue_response(b"CCNK\r") device.driver.io.queue_response(b"CCAK\r") @@ -218,7 +257,7 @@ async def test_seal_waits_for_cycle_completion(self): async def test_device_exposes_sealer_capability(self): profile = PlateLocSerialProfile( response_timeout=0.01, - ack_timeout=0, + ack_timeout=0.01, read_delay=0, stage_move_delay=0, cycle_poll_interval=0, @@ -226,11 +265,18 @@ async def test_device_exposes_sealer_capability(self): device = PlateLoc(name="plateloc", port="COM6", profile=profile, serial_cls=FakeSerial) await device.setup() + device.driver.io.queue_response(b"STAK\r") await device.set_sealing_temperature(100) + device.driver.io.queue_response(b"SSAK\r") await device.set_sealing_time(0.5) + device.driver.io.queue_response(b"STAK\r") + device.driver.io.queue_response(b"SSAK\r") + device.driver.io.queue_response(b"GOAK\r") device.driver.io.queue_response(b"CCAK\r") await device.sealer.seal(120, 1.2) + device.driver.io.queue_response(b"SOAK\r") await device.sealer.open() + device.driver.io.queue_response(b"SIAK\r") await device.sealer.close() status = device.status_snapshot() await device.stop() From e38d1c9d58f99a396bc0b2a981e542b9570bf4f6 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 14 May 2026 12:14:25 -0700 Subject: [PATCH 6/7] Convert PlateLoc hello-world doc to notebook Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agilent/plateloc/hello-world.ipynb | 157 ++++++++++++++++++ .../agilent/plateloc/hello-world.md | 100 ----------- 2 files changed, 157 insertions(+), 100 deletions(-) create mode 100644 docs/user_guide/agilent/plateloc/hello-world.ipynb delete mode 100644 docs/user_guide/agilent/plateloc/hello-world.md diff --git a/docs/user_guide/agilent/plateloc/hello-world.ipynb b/docs/user_guide/agilent/plateloc/hello-world.ipynb new file mode 100644 index 00000000000..9e01e667532 --- /dev/null +++ b/docs/user_guide/agilent/plateloc/hello-world.ipynb @@ -0,0 +1,157 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "plateloc-intro", + "source": "# Agilent PlateLoc\n\nThe Agilent PlateLoc is controlled through PLR's `Sealer` capability with a direct RS-232 serial driver. It does not require Agilent ActiveX, VWorks, or vendor server software. Install the optional serial dependency before connecting:\n\n```bash\npip install \"pylabrobot[serial]\"\n```", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-import", + "source": "from pylabrobot.agilent import PlateLoc\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-setup", + "source": "await plateloc.setup()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-temp", + "source": "await plateloc.set_sealing_temperature(175)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-time", + "source": "await plateloc.set_sealing_time(0.5)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-status-first", + "source": "status = await plateloc.request_status()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-stop-first", + "source": "await plateloc.stop()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-sealer-intro", + "source": "The device also exposes the standard sealer capability:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-seal", + "source": "await plateloc.sealer.seal(temperature=175, duration=1.5)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-open", + "source": "await plateloc.sealer.open()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-close", + "source": "await plateloc.sealer.close()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-sealer-notes", + "source": "`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time.\n\nThe PlateLoc device class also exposes independent setpoint and status helpers:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-set-temp-160", + "source": "await plateloc.set_sealing_temperature(160)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-set-time-1", + "source": "await plateloc.set_sealing_time(1.0)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "id": "plateloc-status-print", + "source": "status = await plateloc.request_status()\nprint(status.target_temperature, status.sealing_time, status.cycle_complete)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-status-notes", + "source": "`request_status()` returns the best-known PLR state plus a live cycle-complete query. The direct serial protocol decoded here does not expose actual block temperature or actual stored time reads, so PLR reports the last successfully written target temperature and sealing time.", + "metadata": {} + }, + { + "cell_type": "markdown", + "id": "plateloc-protocol", + "source": "## Serial command profile\n\nThe decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional setpoint convention: the digits after the decimal point are the integer controller value.\n\n| Operation | Frame |\n|---|---|\n| Set sealing temperature | `ST 0.{temperature_celsius:03d}\\r` |\n| Set sealing time | `SS 0.{seconds_x10:02d}\\r` |\n| Start cycle | `GO 00\\r` |\n| Stop cycle | `AC 00\\r` |\n| Move stage out | `SO 00\\r` |\n| Move stage in | `SI 00\\r` |\n| Apply seal | `AS 00\\r` |\n| Clear error | `CL 00\\r` |\n| Check cycle complete | `CC 00\\r` |\n\nFor example, `set_sealing_temperature(175)` writes `ST 0.175\\r`, `set_sealing_temperature(30)` writes `ST 0.030\\r`, `set_sealing_time(0.5)` writes `SS 0.05\\r`, and `set_sealing_time(1.2)` writes `SS 0.12\\r`.\n\nNegative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\\r`. The cycle-complete command returns `True` for `CCAK\\r` and `False` for `CCNK\\r`.\n\nYou can still override command codes or serial settings with `PlateLocSerialProfile` while keeping the same PLR frontend:", + "metadata": {} + }, + { + "cell_type": "code", + "id": "plateloc-profile", + "source": "from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile\n\nprofile = PlateLocSerialProfile(\n baudrate=19200,\n stage_move_delay=6,\n commands={\n \"set_sealing_temperature\": \"ST\",\n \"set_sealing_time\": \"SS\",\n \"start_cycle\": \"GO\",\n \"move_stage_out\": \"SO\",\n \"move_stage_in\": \"SI\",\n },\n)\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\", profile=profile)", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "plateloc-troubleshooting", + "source": "## Troubleshooting\n\nThe PlateLoc RS-232 connector is not VGA and is not USB TTL. Use a USB-to-RS-232 adapter plus the correct DB9 cable for the instrument. If the port opens but every command times out, verify the PlateLoc is powered, the rear serial cable is seated, and the cable wiring matches the instrument requirement. Some setups require a null-modem DB9 adapter rather than a straight-through cable.", + "metadata": {} + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/user_guide/agilent/plateloc/hello-world.md b/docs/user_guide/agilent/plateloc/hello-world.md deleted file mode 100644 index b8e3cf219b2..00000000000 --- a/docs/user_guide/agilent/plateloc/hello-world.md +++ /dev/null @@ -1,100 +0,0 @@ -# Agilent PlateLoc - -The Agilent PlateLoc is controlled through PLR's `Sealer` capability with a direct RS-232 serial -driver. It does not require Agilent ActiveX, VWorks, or vendor server software. Install the -optional serial dependency before connecting: - -```bash -pip install "pylabrobot[serial]" -``` - -```python -from pylabrobot.agilent import PlateLoc - -plateloc = PlateLoc(name="plateloc", port="COM6") - -await plateloc.setup() -await plateloc.set_sealing_temperature(175) -await plateloc.set_sealing_time(0.5) -status = await plateloc.request_status() -await plateloc.stop() -``` - -The device also exposes the standard sealer capability: - -```python -await plateloc.sealer.seal(temperature=175, duration=1.5) -await plateloc.sealer.open() -await plateloc.sealer.close() -``` - -`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. -`sealer.seal()` starts a sealing cycle after writing the requested temperature and time. - -The PlateLoc device class also exposes independent setpoint and status helpers: - -```python -await plateloc.set_sealing_temperature(160) -await plateloc.set_sealing_time(1.0) - -status = await plateloc.request_status() -print(status.target_temperature, status.sealing_time, status.cycle_complete) -``` - -`request_status()` returns the best-known PLR state plus a live cycle-complete query. The direct -serial protocol decoded here does not expose actual block temperature or actual stored time reads, -so PLR reports the last successfully written target temperature and sealing time. - -## Serial command profile - -The decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with -two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional -setpoint convention: the digits after the decimal point are the integer controller value. - -| Operation | Frame | -|---|---| -| Set sealing temperature | `ST 0.{temperature_celsius:03d}\r` | -| Set sealing time | `SS 0.{seconds_x10:02d}\r` | -| Start cycle | `GO 00\r` | -| Stop cycle | `AC 00\r` | -| Move stage out | `SO 00\r` | -| Move stage in | `SI 00\r` | -| Apply seal | `AS 00\r` | -| Clear error | `CL 00\r` | -| Check cycle complete | `CC 00\r` | - -For example, `set_sealing_temperature(175)` writes `ST 0.175\r`, `set_sealing_temperature(30)` -writes `ST 0.030\r`, `set_sealing_time(0.5)` writes `SS 0.05\r`, and `set_sealing_time(1.2)` -writes `SS 0.12\r`. - -Negative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some -valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\r`. The -cycle-complete command returns `True` for `CCAK\r` and `False` for `CCNK\r`. - -You can still override command codes or serial settings with `PlateLocSerialProfile` while keeping -the same PLR frontend: - -```python -from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile - -profile = PlateLocSerialProfile( - baudrate=19200, - stage_move_delay=6, - commands={ - "set_sealing_temperature": "ST", - "set_sealing_time": "SS", - "start_cycle": "GO", - "move_stage_out": "SO", - "move_stage_in": "SI", - }, -) - -plateloc = PlateLoc(name="plateloc", port="COM6", profile=profile) -``` - -## Troubleshooting - -The PlateLoc RS-232 connector is not VGA and is not USB TTL. Use a USB-to-RS-232 adapter plus the -correct DB9 cable for the instrument. If the port opens but every command times out, verify the -PlateLoc is powered, the rear serial cable is seated, and the cable wiring matches the instrument -requirement. Some setups require a null-modem DB9 adapter rather than a straight-through cable. From e278c4cd4b1c4e869f9ab376b2ac388684f419aa Mon Sep 17 00:00:00 2001 From: Alex Godfrey Date: Wed, 20 May 2026 16:06:45 -0700 Subject: [PATCH 7/7] Address PlateLoc v1b1 review feedback --- docs/api/pylabrobot.agilent.rst | 19 ++ .../agilent/plateloc/hello-world.ipynb | 34 +- pylabrobot/agilent/__init__.py | 1 + pylabrobot/agilent/plateloc/__init__.py | 2 +- pylabrobot/agilent/plateloc/plateloc.py | 305 +++++++++--------- pylabrobot/agilent/plateloc/plateloc_tests.py | 154 ++++----- 6 files changed, 279 insertions(+), 236 deletions(-) diff --git a/docs/api/pylabrobot.agilent.rst b/docs/api/pylabrobot.agilent.rst index a4115aff613..39a52547b6b 100644 --- a/docs/api/pylabrobot.agilent.rst +++ b/docs/api/pylabrobot.agilent.rst @@ -140,3 +140,22 @@ VSpin .. autoclass:: pylabrobot.agilent.vspin.VSpinCentrifugeBackend.SpinParams :members: + + +PlateLoc +-------- + +.. currentmodule:: pylabrobot.agilent.plateloc + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + PlateLoc + PlateLocSealer + PlateLocSealerBackend + PlateLocDriver + PlateLocSerialProfile + PlateLocStatus + PlateLocError diff --git a/docs/user_guide/agilent/plateloc/hello-world.ipynb b/docs/user_guide/agilent/plateloc/hello-world.ipynb index 9e01e667532..246a6b5a63d 100644 --- a/docs/user_guide/agilent/plateloc/hello-world.ipynb +++ b/docs/user_guide/agilent/plateloc/hello-world.ipynb @@ -25,7 +25,7 @@ { "cell_type": "code", "id": "plateloc-set-temp", - "source": "await plateloc.set_sealing_temperature(175)", + "source": "await plateloc.sealer.set_sealing_temperature(175)", "metadata": {}, "execution_count": null, "outputs": [] @@ -33,7 +33,7 @@ { "cell_type": "code", "id": "plateloc-set-time", - "source": "await plateloc.set_sealing_time(0.5)", + "source": "await plateloc.sealer.set_sealing_time(0.5)", "metadata": {}, "execution_count": null, "outputs": [] @@ -41,15 +41,7 @@ { "cell_type": "code", "id": "plateloc-status-first", - "source": "status = await plateloc.request_status()", - "metadata": {}, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "code", - "id": "plateloc-stop-first", - "source": "await plateloc.stop()", + "source": "status = await plateloc.sealer.request_status()", "metadata": {}, "execution_count": null, "outputs": [] @@ -87,13 +79,13 @@ { "cell_type": "markdown", "id": "plateloc-sealer-notes", - "source": "`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time.\n\nThe PlateLoc device class also exposes independent setpoint and status helpers:", + "source": "`sealer.open()` and `sealer.close()` move the stage and wait for the default stage-settle delay. `sealer.seal()` starts a sealing cycle after writing the requested temperature and time.\n\nThe PlateLoc sealer capability also exposes independent setpoint and status helpers:", "metadata": {} }, { "cell_type": "code", "id": "plateloc-set-temp-160", - "source": "await plateloc.set_sealing_temperature(160)", + "source": "await plateloc.sealer.set_sealing_temperature(160)", "metadata": {}, "execution_count": null, "outputs": [] @@ -101,7 +93,7 @@ { "cell_type": "code", "id": "plateloc-set-time-1", - "source": "await plateloc.set_sealing_time(1.0)", + "source": "await plateloc.sealer.set_sealing_time(1.0)", "metadata": {}, "execution_count": null, "outputs": [] @@ -109,7 +101,7 @@ { "cell_type": "code", "id": "plateloc-status-print", - "source": "status = await plateloc.request_status()\nprint(status.target_temperature, status.sealing_time, status.cycle_complete)", + "source": "status = await plateloc.sealer.request_status()\nprint(status.target_temperature, status.sealing_time, status.cycle_complete)", "metadata": {}, "execution_count": null, "outputs": [] @@ -120,16 +112,24 @@ "source": "`request_status()` returns the best-known PLR state plus a live cycle-complete query. The direct serial protocol decoded here does not expose actual block temperature or actual stored time reads, so PLR reports the last successfully written target temperature and sealing time.", "metadata": {} }, + { + "cell_type": "code", + "id": "plateloc-stop", + "source": "await plateloc.stop()", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, { "cell_type": "markdown", "id": "plateloc-protocol", - "source": "## Serial command profile\n\nThe decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional setpoint convention: the digits after the decimal point are the integer controller value.\n\n| Operation | Frame |\n|---|---|\n| Set sealing temperature | `ST 0.{temperature_celsius:03d}\\r` |\n| Set sealing time | `SS 0.{seconds_x10:02d}\\r` |\n| Start cycle | `GO 00\\r` |\n| Stop cycle | `AC 00\\r` |\n| Move stage out | `SO 00\\r` |\n| Move stage in | `SI 00\\r` |\n| Apply seal | `AS 00\\r` |\n| Clear error | `CL 00\\r` |\n| Check cycle complete | `CC 00\\r` |\n\nFor example, `set_sealing_temperature(175)` writes `ST 0.175\\r`, `set_sealing_temperature(30)` writes `ST 0.030\\r`, `set_sealing_time(0.5)` writes `SS 0.05\\r`, and `set_sealing_time(1.2)` writes `SS 0.12\\r`.\n\nNegative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\\r`. The cycle-complete command returns `True` for `CCAK\\r` and `False` for `CCNK\\r`.\n\nYou can still override command codes or serial settings with `PlateLocSerialProfile` while keeping the same PLR frontend:", + "source": "## Serial command profile\n\nThe decoded direct protocol uses `19200 8N1` and carriage-return-terminated ASCII frames with two-letter command codes plus payloads. Temperature and time payloads use the firmware's fractional setpoint convention: the digits after the decimal point are the integer controller value.\n\n| Operation | Frame |\n|---|---|\n| Set sealing temperature | `ST 0.{temperature_celsius:03d}\\r` |\n| Set sealing time | `SS 0.{seconds_x10:02d}\\r` |\n| Start cycle | `GO 00\\r` |\n| Stop cycle | `AC 00\\r` |\n| Move stage out | `SO 00\\r` |\n| Move stage in | `SI 00\\r` |\n| Apply seal | `AS 00\\r` |\n| Clear error | `CL 00\\r` |\n| Check cycle complete | `CC 00\\r` |\n\nFor example, `set_sealing_temperature(175)` writes `ST 0.175\\r`, `set_sealing_temperature(30)` writes `ST 0.030\\r`, `set_sealing_time(0.5)` writes `SS 0.05\\r`, and `set_sealing_time(1.2)` writes `SS 0.12\\r`.\n\nNegative acknowledgements are parsed as `NK(message)` and raised as `PlateLocError`. Some valid firmware commands reply with single-carriage-return acknowledgements such as `SOAK\\r`. The cycle-complete command returns `True` for `CCAK\\r` and `False` for `CCNK\\r`.\n\nYou can still override serial settings and timing with `PlateLocSerialProfile` while keeping the same PLR frontend:", "metadata": {} }, { "cell_type": "code", "id": "plateloc-profile", - "source": "from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile\n\nprofile = PlateLocSerialProfile(\n baudrate=19200,\n stage_move_delay=6,\n commands={\n \"set_sealing_temperature\": \"ST\",\n \"set_sealing_time\": \"SS\",\n \"start_cycle\": \"GO\",\n \"move_stage_out\": \"SO\",\n \"move_stage_in\": \"SI\",\n },\n)\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\", profile=profile)", + "source": "from pylabrobot.agilent import PlateLoc, PlateLocSerialProfile\n\nprofile = PlateLocSerialProfile(\n baudrate=19200,\n stage_move_delay=6,\n)\n\nplateloc = PlateLoc(name=\"plateloc\", port=\"COM6\", profile=profile)", "metadata": {}, "execution_count": null, "outputs": [] diff --git a/pylabrobot/agilent/__init__.py b/pylabrobot/agilent/__init__.py index efc270351e3..c3afcc64913 100644 --- a/pylabrobot/agilent/__init__.py +++ b/pylabrobot/agilent/__init__.py @@ -16,6 +16,7 @@ PlateLoc, PlateLocDriver, PlateLocError, + PlateLocSealer, PlateLocSealerBackend, PlateLocSerialProfile, PlateLocStatus, diff --git a/pylabrobot/agilent/plateloc/__init__.py b/pylabrobot/agilent/plateloc/__init__.py index 28b11ca53e6..dc194e38b7d 100644 --- a/pylabrobot/agilent/plateloc/__init__.py +++ b/pylabrobot/agilent/plateloc/__init__.py @@ -1,8 +1,8 @@ from .plateloc import ( - DEFAULT_PLATELOC_COMMANDS, PlateLoc, PlateLocDriver, PlateLocError, + PlateLocSealer, PlateLocSealerBackend, PlateLocSerialProfile, PlateLocStatus, diff --git a/pylabrobot/agilent/plateloc/plateloc.py b/pylabrobot/agilent/plateloc/plateloc.py index 92411d30ef8..152d01bdc8f 100644 --- a/pylabrobot/agilent/plateloc/plateloc.py +++ b/pylabrobot/agilent/plateloc/plateloc.py @@ -1,14 +1,13 @@ from __future__ import annotations import asyncio -import contextlib import dataclasses import logging import re import time -from typing import Mapping, Optional, cast +from typing import Optional, cast -from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.capability import BackendParams, need_capability_ready from pylabrobot.capabilities.sealing import Sealer, SealerBackend from pylabrobot.device import Device, Driver from pylabrobot.io.serial import Serial @@ -24,18 +23,6 @@ logger = logging.getLogger(__name__) -DEFAULT_PLATELOC_COMMANDS: Mapping[str, str] = { - "set_sealing_temperature": "ST", - "set_sealing_time": "SS", - "move_stage_out": "SO", - "move_stage_in": "SI", - "start_cycle": "GO", - "stop_cycle": "AC", - "apply_seal": "AS", - "clear_error": "CL", - "check_cycle_complete": "CC", -} - _ACK_RE = re.compile(r"^\s*(?P[A-Z0-9]{2})(?P[AN])K(?:\((?P.*)\))?\s*$") @@ -59,11 +46,11 @@ class PlateLocStatus: @dataclasses.dataclass(frozen=True) class PlateLocSerialProfile: - """Serial settings and command codes for a PlateLoc controller. + """Serial settings for a PlateLoc controller. - The decoded low-level protocol uses two-letter command codes followed by a payload and a - carriage return. Setpoint payloads are encoded as a decimal fraction whose fractional digits - hold the integer setpoint, for example ``ST 0.175`` for 175 C and ``SS 0.12`` for 1.2 s. + The decoded low-level protocol uses carriage-return-terminated ASCII frames. + Setpoint payloads are encoded as a decimal fraction whose fractional digits hold the integer + setpoint, for example ``ST 0.175`` for 175 C and ``SS 0.12`` for 1.2 s. """ baudrate: int = 19200 @@ -82,15 +69,6 @@ class PlateLocSerialProfile: cycle_poll_interval: float = 0.5 command_terminator: str = "\r" response_terminator: bytes = b"\r" - commands: Mapping[str, str] = dataclasses.field( - default_factory=lambda: dict(DEFAULT_PLATELOC_COMMANDS) - ) - - def format_command(self, command: str, payload: str = "00") -> bytes: - code = self.commands.get(command) - if code is None: - raise PlateLocError(f"No PlateLoc serial command configured for {command!r}.") - return f"{code} {payload}{self.command_terminator}".encode("ascii") def serialize(self) -> dict: return { @@ -110,7 +88,6 @@ def serialize(self) -> dict: "cycle_poll_interval": self.cycle_poll_interval, "command_terminator": self.command_terminator, "response_terminator": self.response_terminator.decode("latin1"), - "commands": dict(self.commands), } @classmethod @@ -122,7 +99,7 @@ def deserialize(cls, data: dict) -> "PlateLocSerialProfile": class PlateLocDriver(Driver): - """Direct serial driver for the Agilent PlateLoc thermal microplate sealer.""" + """Direct serial transport for the Agilent PlateLoc thermal microplate sealer.""" def __init__( self, @@ -131,10 +108,9 @@ def __init__( pid: Optional[int] = None, profile: Optional[PlateLocSerialProfile | dict] = None, timeout: float = 30, - serial_cls=Serial, ) -> None: super().__init__() - if serial_cls is Serial and not HAS_SERIAL: + if not HAS_SERIAL: raise RuntimeError( "pyserial is not installed. Install with: pip install pylabrobot[serial]. " f"Import error: {_SERIAL_IMPORT_ERROR}" @@ -144,12 +120,9 @@ def __init__( self.profile = profile or PlateLocSerialProfile() self.timeout = timeout self._connected = False - self._target_temperature: Optional[float] = None - self._sealing_time: Optional[float] = None - self._stage_position: Optional[str] = None self._last_command: Optional[str] = None self._last_response: Optional[str] = None - self.io = serial_cls( + self.io = Serial( human_readable_device_name="Agilent PlateLoc Sealer", port=port, vid=vid, @@ -169,6 +142,18 @@ def __init__( def port(self) -> str: return cast(str, self.io.port) + @property + def connected(self) -> bool: + return self._connected + + @property + def last_command(self) -> Optional[str]: + return self._last_command + + @property + def last_response(self) -> Optional[str]: + return self._last_response + async def setup(self, backend_params: Optional[BackendParams] = None): await self.io.setup() self._connected = True @@ -179,44 +164,35 @@ async def stop(self): self._connected = False logger.info("[PlateLoc %s] disconnected", self.port) - @contextlib.contextmanager - def _read_timeout(self, timeout: float): - if hasattr(self.io, "temporary_timeout"): - with self.io.temporary_timeout(timeout): - yield - else: - yield - async def send_command( self, command: str, - payload: str = "00", - expect_response: bool = False, - raise_on_nak: bool = True, + *, + timeout: Optional[float] = None, + required: bool = True, ) -> Optional[str]: - data = self.profile.format_command(command, payload=payload) - if hasattr(self.io, "reset_input_buffer"): - await self.io.reset_input_buffer() - await self.io.write(data) + """Send one literal PlateLoc serial frame and return the raw response.""" + command = command.removesuffix(self.profile.command_terminator) + await self.io.reset_input_buffer() + await self.io.write(f"{command}{self.profile.command_terminator}".encode("ascii")) + self._last_command = command + if self.profile.read_delay > 0: await asyncio.sleep(self.profile.read_delay) - response = await self.read_response( - timeout=self.profile.response_timeout if expect_response else self.profile.ack_timeout, - required=True, - ) - assert response is not None - self._last_command = command + response = await self.read_response(timeout=timeout, required=required) self._last_response = response - if raise_on_nak: - self._raise_for_error(command, response) return response - async def read_response(self, timeout: Optional[float] = None, required: bool = True) -> Optional[str]: + async def read_response( + self, + timeout: Optional[float] = None, + required: bool = True, + ) -> Optional[str]: deadline = time.time() + (timeout if timeout is not None else self.profile.response_timeout) chunks = bytearray() while time.time() < deadline: - with self._read_timeout(max(0.01, min(0.1, deadline - time.time()))): + with self.io.temporary_timeout(max(0.01, min(0.1, deadline - time.time()))): chunk = await self.io.read(1) if chunk: chunks.extend(chunk) @@ -231,25 +207,80 @@ async def read_response(self, timeout: Optional[float] = None, required: bool = return None return bytes(chunks).decode("utf-8", errors="replace").strip() - def _raise_for_error(self, command: str, response: str): + def serialize(self) -> dict: + return { + **super().serialize(), + "port": self.port, + "profile": self.profile.serialize(), + "timeout": self.timeout, + } + + +class PlateLocSealerBackend(SealerBackend): + """Translates SealerBackend operations into direct PlateLoc serial commands.""" + + _SET_TEMPERATURE = "ST" + _SET_TIME = "SS" + _MOVE_STAGE_OUT = "SO" + _MOVE_STAGE_IN = "SI" + _START_CYCLE = "GO" + _STOP_CYCLE = "AC" + _APPLY_SEAL = "AS" + _CLEAR_ERROR = "CL" + _CHECK_CYCLE_COMPLETE = "CC" + + def __init__(self, driver: PlateLocDriver): + self._driver = driver + self._target_temperature: Optional[float] = None + self._sealing_time: Optional[float] = None + self._stage_position: Optional[str] = None + + @property + def driver(self) -> PlateLocDriver: + return self._driver + + def _parse_response(self, command_code: str, response: str) -> re.Match[str]: match = _ACK_RE.match(response) if match is None: - raise PlateLocError(f"PlateLoc returned invalid response to {command!r}: {response!r}") + raise PlateLocError(f"PlateLoc returned invalid response to {command_code!r}: {response!r}") code = match.group("code") - expected_code = self.profile.commands.get(command) - if expected_code is not None and code != expected_code: - raise PlateLocError(f"PlateLoc replied with {code!r} to {command!r}: {response!r}") + if code != command_code: + raise PlateLocError(f"PlateLoc replied with {code!r} to {command_code!r}: {response!r}") + return match + + def _raise_for_error(self, command_code: str, response: str) -> None: + match = self._parse_response(command_code, response) if match.group("status") == "N": message = match.group("message") or "command rejected" - raise PlateLocError(f"PlateLoc rejected {command!r}: {message}") + raise PlateLocError(f"PlateLoc rejected {command_code!r}: {message}") + + async def _send( + self, + command: str, + *, + timeout: Optional[float] = None, + raise_on_nak: bool = True, + ) -> str: + response = await self._driver.send_command( + command, + timeout=timeout if timeout is not None else self._driver.profile.ack_timeout, + required=True, + ) + assert response is not None + if raise_on_nak: + self._raise_for_error(command[:2], response) + return response async def set_sealing_temperature(self, temperature: float): if not (20 <= temperature <= 235): raise ValueError("Temperature out of range. Please enter a value between 20 and 235 C.") target_temperature = round(temperature) - payload = f"0.{target_temperature:03d}" - logger.info("[PlateLoc %s] setting sealing temperature to %.1f C", self.port, temperature) - response = await self.send_command("set_sealing_temperature", payload=payload) + logger.info( + "[PlateLoc %s] setting sealing temperature to %.1f C", + self._driver.port, + temperature, + ) + response = await self._send(f"{self._SET_TEMPERATURE} 0.{target_temperature:03d}") self._target_temperature = float(target_temperature) return response @@ -257,115 +288,113 @@ async def set_sealing_time(self, duration: float): if not (0.5 <= duration <= 12.0): raise ValueError("Duration out of range. Please enter a value between 0.5 and 12.0 s.") sealing_time_deciseconds = round(duration * 10) - payload = f"0.{sealing_time_deciseconds:02d}" - logger.info("[PlateLoc %s] setting sealing time to %.2f s", self.port, duration) - response = await self.send_command("set_sealing_time", payload=payload) + logger.info("[PlateLoc %s] setting sealing time to %.2f s", self._driver.port, duration) + response = await self._send(f"{self._SET_TIME} 0.{sealing_time_deciseconds:02d}") self._sealing_time = sealing_time_deciseconds / 10 return response async def move_stage_out(self): - logger.info("[PlateLoc %s] moving stage out", self.port) - response = await self.send_command("move_stage_out") - if self.profile.stage_move_delay > 0: - await asyncio.sleep(self.profile.stage_move_delay) + logger.info("[PlateLoc %s] moving stage out", self._driver.port) + response = await self._send(f"{self._MOVE_STAGE_OUT} 00") + if self._driver.profile.stage_move_delay > 0: + await asyncio.sleep(self._driver.profile.stage_move_delay) self._stage_position = "open" return response async def move_stage_in(self): - logger.info("[PlateLoc %s] moving stage in", self.port) - response = await self.send_command("move_stage_in") - if self.profile.stage_move_delay > 0: - await asyncio.sleep(self.profile.stage_move_delay) + logger.info("[PlateLoc %s] moving stage in", self._driver.port) + response = await self._send(f"{self._MOVE_STAGE_IN} 00") + if self._driver.profile.stage_move_delay > 0: + await asyncio.sleep(self._driver.profile.stage_move_delay) self._stage_position = "closed" return response async def start_cycle(self): - logger.info("[PlateLoc %s] starting sealing cycle", self.port) - return await self.send_command("start_cycle") + logger.info("[PlateLoc %s] starting sealing cycle", self._driver.port) + return await self._send(f"{self._START_CYCLE} 00") async def stop_cycle(self): - logger.info("[PlateLoc %s] stopping sealing cycle", self.port) - return await self.send_command("stop_cycle") + logger.info("[PlateLoc %s] stopping sealing cycle", self._driver.port) + return await self._send(f"{self._STOP_CYCLE} 00") async def apply_seal(self): - logger.info("[PlateLoc %s] applying seal", self.port) - return await self.send_command("apply_seal") + logger.info("[PlateLoc %s] applying seal", self._driver.port) + return await self._send(f"{self._APPLY_SEAL} 00") async def clear_error(self): - logger.info("[PlateLoc %s] clearing error", self.port) - return await self.send_command("clear_error") + logger.info("[PlateLoc %s] clearing error", self._driver.port) + return await self._send(f"{self._CLEAR_ERROR} 00") async def check_cycle_complete(self) -> bool: - response = await self.send_command( - "check_cycle_complete", - expect_response=True, + response = await self._send( + f"{self._CHECK_CYCLE_COMPLETE} 00", + timeout=self._driver.profile.response_timeout, raise_on_nak=False, ) - match = _ACK_RE.match(response or "") - if match is None: - raise PlateLocError( - f"PlateLoc returned invalid response to 'check_cycle_complete': {response!r}" - ) - expected_code = self.profile.commands.get("check_cycle_complete") - if expected_code is not None and match.group("code") != expected_code: - raise PlateLocError( - f"PlateLoc replied with {match.group('code')!r} to 'check_cycle_complete': {response!r}" - ) + match = self._parse_response(self._CHECK_CYCLE_COMPLETE, response) return match.group("status") == "A" async def wait_for_cycle_complete(self, timeout: Optional[float] = None) -> bool: - deadline = time.time() + (self.timeout if timeout is None else timeout) + deadline = time.time() + (self._driver.timeout if timeout is None else timeout) while True: if await self.check_cycle_complete(): return True remaining = deadline - time.time() if remaining <= 0: raise TimeoutError("Timeout while waiting for PlateLoc cycle to complete") - await asyncio.sleep(min(max(self.profile.cycle_poll_interval, 0), remaining)) + await asyncio.sleep(min(max(self._driver.profile.cycle_poll_interval, 0), remaining)) def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: return PlateLocStatus( - port=self.port, - connected=self._connected, + port=self._driver.port, + connected=self._driver.connected, target_temperature=self._target_temperature, sealing_time=self._sealing_time, stage_position=self._stage_position, cycle_complete=cycle_complete, - last_command=self._last_command, - last_response=self._last_response, + last_command=self._driver.last_command, + last_response=self._driver.last_response, ) async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: cycle_complete = await self.check_cycle_complete() if query_cycle_complete else None return self.status_snapshot(cycle_complete=cycle_complete) - def serialize(self) -> dict: - return { - **super().serialize(), - "port": self.port, - "profile": self.profile.serialize(), - "timeout": self.timeout, - } - - -class PlateLocSealerBackend(SealerBackend): - """Translates SealerBackend operations into direct PlateLoc serial commands.""" - - def __init__(self, driver: PlateLocDriver): - self.driver = driver - async def seal(self, temperature: int, duration: float): - await self.driver.set_sealing_temperature(temperature) - await self.driver.set_sealing_time(duration) - response = await self.driver.start_cycle() - await self.driver.wait_for_cycle_complete() + await self.set_sealing_temperature(temperature) + await self.set_sealing_time(duration) + response = await self.start_cycle() + await self.wait_for_cycle_complete() return response async def open(self): - return await self.driver.move_stage_out() + return await self.move_stage_out() async def close(self): - return await self.driver.move_stage_in() + return await self.move_stage_in() + + +class PlateLocSealer(Sealer): + """PlateLoc-specific sealing capability.""" + + def __init__(self, backend: PlateLocSealerBackend): + super().__init__(backend=backend) + self.backend: PlateLocSealerBackend = backend + + @need_capability_ready + async def set_sealing_temperature(self, temperature: float): + return await self.backend.set_sealing_temperature(temperature) + + @need_capability_ready + async def set_sealing_time(self, duration: float): + return await self.backend.set_sealing_time(duration) + + @need_capability_ready + async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: + return await self.backend.request_status(query_cycle_complete=query_cycle_complete) + + def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: + return self.backend.status_snapshot(cycle_complete=cycle_complete) class PlateLoc(Device): @@ -379,7 +408,6 @@ def __init__( pid: Optional[int] = None, profile: Optional[PlateLocSerialProfile | dict] = None, timeout: float = 30, - serial_cls=Serial, ): self.name = name driver = PlateLocDriver( @@ -388,28 +416,15 @@ def __init__( pid=pid, profile=profile, timeout=timeout, - serial_cls=serial_cls, ) super().__init__(driver=driver) self.driver: PlateLocDriver = driver - self.sealer = Sealer(backend=PlateLocSealerBackend(driver)) + self.sealer: PlateLocSealer = PlateLocSealer(backend=PlateLocSealerBackend(driver)) self._capabilities = [self.sealer] - async def set_sealing_temperature(self, temperature: float): - return await self.driver.set_sealing_temperature(temperature) - - async def set_sealing_time(self, duration: float): - return await self.driver.set_sealing_time(duration) - - def status_snapshot(self, cycle_complete: Optional[bool] = None) -> PlateLocStatus: - return self.driver.status_snapshot(cycle_complete=cycle_complete) - - async def request_status(self, query_cycle_complete: bool = True) -> PlateLocStatus: - return await self.driver.request_status(query_cycle_complete=query_cycle_complete) - def serialize(self) -> dict: return { **super().serialize(), "name": self.name, - "status": dataclasses.asdict(self.status_snapshot()), + "status": dataclasses.asdict(self.sealer.status_snapshot()), } diff --git a/pylabrobot/agilent/plateloc/plateloc_tests.py b/pylabrobot/agilent/plateloc/plateloc_tests.py index ae5eb51756e..fc8c6b84999 100644 --- a/pylabrobot/agilent/plateloc/plateloc_tests.py +++ b/pylabrobot/agilent/plateloc/plateloc_tests.py @@ -3,12 +3,15 @@ import unittest from collections import deque from typing import Deque +from unittest.mock import patch +import pylabrobot.agilent.plateloc.plateloc as plateloc_module from pylabrobot.agilent.plateloc import ( - DEFAULT_PLATELOC_COMMANDS, PlateLoc, PlateLocDriver, PlateLocError, + PlateLocSealer, + PlateLocSealerBackend, PlateLocSerialProfile, PlateLocStatus, ) @@ -68,16 +71,38 @@ async def reset_input_buffer(self): class PlateLocTests(unittest.IsolatedAsyncioTestCase): - def make_driver(self, commands=None, ack_timeout=0.01, timeout=30): + @contextlib.contextmanager + def patch_serial(self): + with ( + patch.object(plateloc_module, "HAS_SERIAL", True), + patch.object(plateloc_module, "Serial", FakeSerial), + ): + yield + + def make_driver(self, ack_timeout=0.01, timeout=30): profile = PlateLocSerialProfile( response_timeout=0.01, ack_timeout=ack_timeout, read_delay=0, stage_move_delay=0, cycle_poll_interval=0, - commands=commands or DEFAULT_PLATELOC_COMMANDS, ) - return PlateLocDriver(port="COM6", profile=profile, timeout=timeout, serial_cls=FakeSerial) + with self.patch_serial(): + return PlateLocDriver(port="COM6", profile=profile, timeout=timeout) + + def make_device(self, timeout=30): + profile = PlateLocSerialProfile( + response_timeout=0.01, + ack_timeout=0.01, + read_delay=0, + stage_move_delay=0, + cycle_poll_interval=0, + ) + with self.patch_serial(): + return PlateLoc(name="plateloc", port="COM6", profile=profile, timeout=timeout) + + def backend(self, device: PlateLoc) -> PlateLocSealerBackend: + return device.sealer.backend async def test_setup_uses_plr_serial_wrapper_settings(self): driver = self.make_driver() @@ -95,90 +120,111 @@ async def test_setup_uses_plr_serial_wrapper_settings(self): await driver.stop() self.assertTrue(driver.io.stop_called) + async def test_driver_sends_literal_serial_frame(self): + driver = self.make_driver() + await driver.setup() + driver.io.queue_response(b"STAK\r") + + response = await driver.send_command("ST 0.030", timeout=0.01) + + self.assertEqual(response, "STAK") + self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) + self.assertTrue(driver.io.reset_input_buffer_called) + self.assertEqual(driver.last_command, "ST 0.030") + self.assertEqual(driver.last_response, "STAK") + async def test_temperature_and_time_writes_are_scaled_and_validated(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"STAK\r") driver.io.queue_response(b"SSAK\r") - await driver.set_sealing_temperature(30) - await driver.set_sealing_time(0.5) + await backend.set_sealing_temperature(30) + await backend.set_sealing_time(0.5) self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r"]) with self.assertRaises(ValueError): - await driver.set_sealing_temperature(19) + await backend.set_sealing_temperature(19) with self.assertRaises(ValueError): - await driver.set_sealing_time(0.4) + await backend.set_sealing_time(0.4) async def test_negative_acknowledgement_raises_protocol_error(self): - driver = self.make_driver(ack_timeout=0.01) + driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"STNK(Desired Temperature is Out of Range)\r\r") with self.assertRaisesRegex(PlateLocError, "Desired Temperature is Out of Range"): - await driver.set_sealing_temperature(30) + await backend.set_sealing_temperature(30) self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) async def test_missing_acknowledgement_raises_timeout(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() with self.assertRaisesRegex(TimeoutError, "Timeout"): - await driver.set_sealing_temperature(30) + await backend.set_sealing_temperature(30) self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) async def test_malformed_acknowledgement_raises_protocol_error(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"unexpected\r") with self.assertRaisesRegex(PlateLocError, "invalid response"): - await driver.set_sealing_temperature(30) + await backend.set_sealing_temperature(30) self.assertEqual(driver.io.writes, [b"ST 0.030\r"]) async def test_required_response_reads_until_plate_loc_ack(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"CCAK\r") - self.assertTrue(await driver.check_cycle_complete()) + self.assertTrue(await backend.check_cycle_complete()) self.assertEqual(driver.io.writes, [b"CC 00\r"]) async def test_cycle_not_complete_returns_false(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"CCNK\r") - self.assertFalse(await driver.check_cycle_complete()) + self.assertFalse(await backend.check_cycle_complete()) self.assertEqual(driver.io.writes, [b"CC 00\r"]) async def test_invalid_cycle_complete_response_raises_protocol_error(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"unexpected\r") with self.assertRaisesRegex(PlateLocError, "invalid response"): - await driver.check_cycle_complete() + await backend.check_cycle_complete() self.assertEqual(driver.io.writes, [b"CC 00\r"]) async def test_status_snapshot_tracks_setpoints_and_live_cycle_complete(self): driver = self.make_driver() + backend = PlateLocSealerBackend(driver) await driver.setup() driver.io.queue_response(b"STAK\r") driver.io.queue_response(b"SSAK\r") driver.io.queue_response(b"SOAK\r") - await driver.set_sealing_temperature(30) - await driver.set_sealing_time(0.5) - await driver.move_stage_out() + await backend.set_sealing_temperature(30) + await backend.set_sealing_time(0.5) + await backend.move_stage_out() driver.io.queue_response(b"CCAK\r") - status = await driver.request_status() + status = await backend.request_status() self.assertIsInstance(status, PlateLocStatus) self.assertEqual(status.port, "COM6") @@ -187,52 +233,13 @@ async def test_status_snapshot_tracks_setpoints_and_live_cycle_complete(self): self.assertEqual(status.sealing_time, 0.5) self.assertEqual(status.stage_position, "open") self.assertTrue(status.cycle_complete) - self.assertEqual(status.last_command, "check_cycle_complete") + self.assertEqual(status.last_command, "CC 00") self.assertEqual(status.last_response, "CCAK") self.assertEqual(driver.io.writes, [b"ST 0.030\r", b"SS 0.05\r", b"SO 00\r", b"CC 00\r"]) - async def test_custom_command_profile(self): - driver = self.make_driver( - commands={ - "set_sealing_temperature": "TP", - "set_sealing_time": "TM", - } - ) - await driver.setup() - driver.io.queue_response(b"TPAK\r") - driver.io.queue_response(b"TMAK\r") - - await driver.set_sealing_temperature(120) - await driver.set_sealing_time(1.25) - - self.assertEqual(driver.io.writes, [b"TP 0.120\r", b"TM 0.12\r"]) - - async def test_custom_command_acknowledgement_codes_are_parsed(self): - commands = { - **DEFAULT_PLATELOC_COMMANDS, - "set_sealing_temperature": "TP", - "check_cycle_complete": "CP", - } - driver = self.make_driver(commands=commands, ack_timeout=0.01) - await driver.setup() - driver.io.queue_response(b"TPNK(Desired Temperature is Out of Range)\r") - - with self.assertRaisesRegex(PlateLocError, "Desired Temperature is Out of Range"): - await driver.set_sealing_temperature(120) - - driver.io.queue_response(b"CPAK\r") - self.assertTrue(await driver.check_cycle_complete()) - self.assertEqual(driver.io.writes, [b"TP 0.120\r", b"CP 00\r"]) - async def test_seal_waits_for_cycle_completion(self): - profile = PlateLocSerialProfile( - response_timeout=0.01, - ack_timeout=0.01, - read_delay=0, - stage_move_delay=0, - cycle_poll_interval=0, - ) - device = PlateLoc(name="plateloc", port="COM6", profile=profile, timeout=1, serial_cls=FakeSerial) + device = self.make_device(timeout=1) + backend = self.backend(device) await device.setup() device.driver.io.queue_response(b"STAK\r") @@ -253,22 +260,17 @@ async def test_seal_waits_for_cycle_completion(self): b"CC 00\r", ], ) + self.assertEqual(backend.status_snapshot().target_temperature, 120) + self.assertEqual(backend.status_snapshot().sealing_time, 1.2) - async def test_device_exposes_sealer_capability(self): - profile = PlateLocSerialProfile( - response_timeout=0.01, - ack_timeout=0.01, - read_delay=0, - stage_move_delay=0, - cycle_poll_interval=0, - ) - device = PlateLoc(name="plateloc", port="COM6", profile=profile, serial_cls=FakeSerial) + async def test_device_exposes_plate_loc_sealer_capability(self): + device = self.make_device() await device.setup() device.driver.io.queue_response(b"STAK\r") - await device.set_sealing_temperature(100) + await device.sealer.set_sealing_temperature(100) device.driver.io.queue_response(b"SSAK\r") - await device.set_sealing_time(0.5) + await device.sealer.set_sealing_time(0.5) device.driver.io.queue_response(b"STAK\r") device.driver.io.queue_response(b"SSAK\r") device.driver.io.queue_response(b"GOAK\r") @@ -278,9 +280,13 @@ async def test_device_exposes_sealer_capability(self): await device.sealer.open() device.driver.io.queue_response(b"SIAK\r") await device.sealer.close() - status = device.status_snapshot() + device.driver.io.queue_response(b"CCAK\r") + status = await device.sealer.request_status() await device.stop() + self.assertIsInstance(device.sealer, PlateLocSealer) + self.assertFalse(hasattr(device, "set_sealing_temperature")) + self.assertFalse(hasattr(device, "set_sealing_time")) self.assertEqual( device.driver.io.writes, [ @@ -292,11 +298,13 @@ async def test_device_exposes_sealer_capability(self): b"CC 00\r", b"SO 00\r", b"SI 00\r", + b"CC 00\r", ], ) self.assertEqual(status.target_temperature, 120) self.assertEqual(status.sealing_time, 1.2) self.assertEqual(status.stage_position, "closed") + self.assertTrue(status.cycle_complete) self.assertTrue(device.driver.io.stop_called)