diff --git a/src/dodal/beamlines/i05.py b/src/dodal/beamlines/i05.py index c0a3d525923..f03e69e6101 100644 --- a/src/dodal/beamlines/i05.py +++ b/src/dodal/beamlines/i05.py @@ -4,7 +4,10 @@ from dodal.devices.beamlines.i05 import I05Goniometer from dodal.devices.beamlines.i05_shared import LensMode, M4M5Mirror, PassEnergy from dodal.devices.common_mirror import XYZSwitchingMirror -from dodal.devices.electron_analyser.mbs import MbsDetector +from dodal.devices.electron_analyser.mbs import ( + EntranceSlitInformationDevice, + MbsDetector, +) from dodal.devices.hutch_shutter import HutchShutter from dodal.devices.pgm import PlaneGratingMonochromator from dodal.devices.temperture_controller import Lakeshore336 @@ -51,10 +54,24 @@ def sa() -> I05Goniometer: @devices.factory -def analyser(pgm: PlaneGratingMonochromator) -> MbsDetector[LensMode, PassEnergy]: +def analyser_slits() -> EntranceSlitInformationDevice: + return EntranceSlitInformationDevice(f"{PREFIX.beamline_prefix}-EA-SLITS-01:POS") + + +@devices.factory +def analyser( + pgm: PlaneGratingMonochromator, analyser_slits: EntranceSlitInformationDevice +) -> MbsDetector[LensMode, PassEnergy]: + config_sigs = ( + analyser_slits.direction, + analyser_slits.size, + analyser_slits.shape, + analyser_slits.setting, + ) return MbsDetector[LensMode, PassEnergy]( prefix=f"{PREFIX.beamline_prefix}-EA-DET-02:CAM:", lens_mode_type=LensMode, pass_energy_type=PassEnergy, energy_source=pgm.energy.user_readback, + config_sigs=config_sigs, ) diff --git a/src/dodal/beamlines/i05_1.py b/src/dodal/beamlines/i05_1.py index 62e62d68c56..75b834843be 100644 --- a/src/dodal/beamlines/i05_1.py +++ b/src/dodal/beamlines/i05_1.py @@ -4,7 +4,10 @@ from dodal.devices.beamlines.i05_1 import XYZAzimuthPolarDefocusStage from dodal.devices.beamlines.i05_shared import LensMode, Mj7j8Mirror, PassEnergy from dodal.devices.common_mirror import XYZPiezoSwitchingMirror -from dodal.devices.electron_analyser.mbs import MbsDetector +from dodal.devices.electron_analyser.mbs import ( + EntranceSlitInformationDevice, + MbsDetector, +) from dodal.devices.hutch_shutter import HutchShutter from dodal.devices.pgm import PlaneGratingMonochromator from dodal.log import set_beamline as set_log_beamline @@ -39,11 +42,26 @@ def sm() -> XYZAzimuthPolarDefocusStage: return XYZAzimuthPolarDefocusStage(prefix=f"{PREFIX.beamline_prefix}-EA-SM-01:") +# Note: Currently fails. Requires https://jira.diamond.ac.uk/browse/I05-764 @devices.factory -def analyser(pgm: PlaneGratingMonochromator) -> MbsDetector[LensMode, PassEnergy]: +def analyser_slits() -> EntranceSlitInformationDevice: + return EntranceSlitInformationDevice(f"{PREFIX.beamline_prefix}-EA-SLITS-01:POS") + + +@devices.factory +def analyser( + pgm: PlaneGratingMonochromator, analyser_slits: EntranceSlitInformationDevice +) -> MbsDetector[LensMode, PassEnergy]: + config_sigs = ( + analyser_slits.direction, + analyser_slits.size, + analyser_slits.shape, + analyser_slits.setting, + ) return MbsDetector[LensMode, PassEnergy]( prefix=f"{PREFIX.beamline_prefix}-EA-DET-04:CAM:", lens_mode_type=LensMode, pass_energy_type=PassEnergy, energy_source=pgm.energy.user_readback, + config_sigs=config_sigs, ) diff --git a/src/dodal/devices/electron_analyser/mbs/__init__.py b/src/dodal/devices/electron_analyser/mbs/__init__.py index befe299bcca..9a70f9801b1 100644 --- a/src/dodal/devices/electron_analyser/mbs/__init__.py +++ b/src/dodal/devices/electron_analyser/mbs/__init__.py @@ -1,9 +1,17 @@ +from .mbs_analyser_slits import ( + EntranceSlitInformation, + EntranceSlitInformationDevice, + SlitPosition, +) from .mbs_detector import MbsDetector from .mbs_driver_io import MbsAnalyserDriverIO from .mbs_enums import AcquisitionMode from .mbs_region import MbsRegion, MbsSequence __all__ = [ + "EntranceSlitInformationDevice", + "EntranceSlitInformation", + "SlitPosition", "MbsDetector", "MbsAnalyserDriverIO", "AcquisitionMode", diff --git a/src/dodal/devices/electron_analyser/mbs/mbs_analyser_slits.py b/src/dodal/devices/electron_analyser/mbs/mbs_analyser_slits.py new file mode 100644 index 00000000000..3306ce11f04 --- /dev/null +++ b/src/dodal/devices/electron_analyser/mbs/mbs_analyser_slits.py @@ -0,0 +1,80 @@ +from bluesky.protocols import Reading +from ophyd_async.core import ( + DEFAULT_TIMEOUT, + AsyncStatus, + DeviceMock, + StandardReadable, + StrictEnum, + soft_signal_r_and_setter, +) +from ophyd_async.epics.core import epics_signal_rw +from pydantic import BaseModel + + +class SlitPosition(StrictEnum): + P100_0_1_CURVED = "100 0.1 curved" + P200_0_1_STRAIGHT = "200 0.1 straight" + P300_0_2_CURVED = "300 0.2 curved" + P400_0_2_STRAIGHT = "400 0.2 straight" + P500_0_2_STRAIGHT = "500 0.2 straight" + P600_0_3_STRAIGHT = "600 0.3 straight" + P700_0_5_STRAIGHT = "700 0.5 straight" + P800_0_8_STRAIGHT = "800 0.8 straight" + P850_3_HOLE = "850 3 hole" + P900_1_5_STRAIGHT = "900 1.5 straight" + + +class EntranceSlitInformation(BaseModel): + direction: str = "vertical" + setting: int = 100 + size: float = 0.1 + shape: str = "curved" + + @classmethod + def from_slit_positions(cls, pos: SlitPosition): + setting, size, shape = str(pos).split() + return cls(setting=int(setting), size=float(size), shape=shape) + + def to_slit_position(self) -> SlitPosition: + return SlitPosition(f"{self.setting} {self.size:g} {self.shape}") + + +class EntranceSlitInformationDevice(StandardReadable): + """Device that connects to epics signal containing slit information from an enum + value. This is synced with soft signals as individual signals which can be added as + config_signals to give to detectors to save as nicely formatted data. + """ + + def __init__(self, pv: str, name: str = ""): + self.slit_pos = epics_signal_rw(SlitPosition, pv) + # Formatted slit info as individual soft signals for metadata + with self.add_children_as_readables(): + self.direction, self._direction_w = soft_signal_r_and_setter(str) + self.setting, self._setting_w = soft_signal_r_and_setter(int) + self.size, self._size_w = soft_signal_r_and_setter(float) + self.shape, self._shape_w = soft_signal_r_and_setter(str) + super().__init__(name) + + @AsyncStatus.wrap + async def set(self, value: SlitPosition): + await self.slit_pos.set(value) + + async def connect( + self, + mock: bool | DeviceMock = False, + timeout: float = DEFAULT_TIMEOUT, + force_reconnect: bool = False, + ) -> None: + await super().connect(mock, timeout, force_reconnect) + + def _sync_soft_signals_with_epics( + value: dict[str, Reading[SlitPosition]], + ) -> None: + val = value[self.slit_pos.name]["value"] + new_slit_info = EntranceSlitInformation.from_slit_positions(val) + self._direction_w(new_slit_info.direction) + self._setting_w(new_slit_info.setting) + self._size_w(new_slit_info.size) + self._shape_w(new_slit_info.shape) + + self.slit_pos.subscribe(_sync_soft_signals_with_epics) diff --git a/src/dodal/devices/electron_analyser/mbs/mbs_detector.py b/src/dodal/devices/electron_analyser/mbs/mbs_detector.py index e2bce556205..852ebc20c46 100644 --- a/src/dodal/devices/electron_analyser/mbs/mbs_detector.py +++ b/src/dodal/devices/electron_analyser/mbs/mbs_detector.py @@ -1,3 +1,4 @@ +from collections.abc import Sequence from typing import Generic from ophyd_async.core import SignalR, soft_signal_rw @@ -32,6 +33,7 @@ def __init__( energy_source: SignalR[float], shutter: GenericFastShutter | None = None, source_selector: SourceSelector | None = None, + config_sigs: Sequence[SignalR] = (), name: str = "", ): # Make attribute of class so connect applies to driver and populates parent. @@ -49,6 +51,7 @@ def __init__( ) trigger_logic = ElectronAnalayserTriggerLogic(self.driver, set()) config_sigs = ( + *config_sigs, self.driver.region_name, self.driver.energy_mode, self.driver.acquisition_mode, diff --git a/tests/devices/electron_analyser/mbs/test_mbs_analyser_slits.py b/tests/devices/electron_analyser/mbs/test_mbs_analyser_slits.py new file mode 100644 index 00000000000..072a00c5566 --- /dev/null +++ b/tests/devices/electron_analyser/mbs/test_mbs_analyser_slits.py @@ -0,0 +1,69 @@ +import pytest +from ophyd_async.core import init_devices +from ophyd_async.testing import assert_reading, partial_reading + +from dodal.devices.electron_analyser.mbs import ( + EntranceSlitInformation, + EntranceSlitInformationDevice, + SlitPosition, +) + + +@pytest.mark.parametrize("slit_pos", [pos.value for pos in SlitPosition]) +def test_entrance_slit_info_to_slit_position(slit_pos: SlitPosition): + slit_info = EntranceSlitInformation.from_slit_positions(slit_pos) + assert slit_info.to_slit_position() == slit_pos + + +def test_entrance_slit_info_from_slit_position(): + slit_info = EntranceSlitInformation.from_slit_positions(SlitPosition.P850_3_HOLE) + assert slit_info.setting == 850 + assert slit_info.size == 3.0 + assert slit_info.shape == "hole" + assert slit_info.direction == "vertical" + + slit_info = EntranceSlitInformation.from_slit_positions( + SlitPosition.P300_0_2_CURVED + ) + assert slit_info.setting == 300 + assert slit_info.size == 0.2 + assert slit_info.shape == "curved" + assert slit_info.direction == "vertical" + + +@pytest.fixture +def slit_info_device() -> EntranceSlitInformationDevice: + with init_devices(mock=True): + slit_info_device = EntranceSlitInformationDevice("TEST:") + return slit_info_device + + +@pytest.mark.parametrize("slit_pos", [pos.value for pos in SlitPosition]) +async def test_slit_info_device_soft_signals_sync_with_epics( + slit_info_device: EntranceSlitInformationDevice, slit_pos: SlitPosition +) -> None: + await slit_info_device.set(slit_pos) + + slit_info = EntranceSlitInformation.from_slit_positions(slit_pos) + assert await slit_info_device.setting.get_value() == slit_info.setting + assert await slit_info_device.shape.get_value() == slit_info.shape + assert await slit_info_device.size.get_value() == slit_info.size + assert await slit_info_device.direction.get_value() == slit_info.direction + + +@pytest.mark.parametrize("slit_pos", [pos.value for pos in SlitPosition]) +async def test_slit_info_device_read_and_soft_signals_sync_with_epics( + slit_info_device: EntranceSlitInformationDevice, slit_pos: SlitPosition +) -> None: + await slit_info_device.set(slit_pos) + slit_info = EntranceSlitInformation.from_slit_positions(slit_pos) + + await assert_reading( + slit_info_device, + { + "slit_info_device-size": partial_reading(slit_info.size), + "slit_info_device-shape": partial_reading(slit_info.shape), + "slit_info_device-setting": partial_reading(slit_info.setting), + "slit_info_device-direction": partial_reading(slit_info.direction), + }, + )