Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,29 @@ mmdirectory: "C:/Program Files/Micro-Manager-1.4"
switchbot:
name: room_light
address: "EC:6F:04:06:5B:23"
timeout: 20.0
timeout: 20.0

# ACUITYnano Precision Thermal Controller (Peltier/TEC, 0.0–99.9 °C). When this
# block is present the device layer registers a `temperature` device and the
# Devices tab shows a setpoint control; plans can also block on it via
# `bps.mv(temperature, 20.0)`. Remove/comment the block to skip registration.
#
# backend: mqtt talks to the controller over the vendor's MQTT bridge
# (acuitynano_precision_thermalizer_api). With no broker/port/user/password
# keys it uses the vendor package's embedded HiveMQ Cloud defaults — set them
# here only to point at a different broker. The vendor package must be
# installed on the device-layer machine (not on PyPI).
temperature:
name: temperature
backend: serial # serial | mqtt | mock — USB serial is the working
# link on this machine (MQTT cloud is firewalled)
com_port: "COM8"
baud_rate: 115200
stabilize_timeout: 600 # seconds to wait for "[ SYSTEM LOCKED ]" on a blocking set
feedback_peltier: false # true = control off the peltier sensor instead of water
# MQTT alternative (vendor HiveMQ Cloud defaults; needs outbound TLS :8883):
# backend: mqtt
# broker: "your-broker.example.com" # optional — overrides the embedded default
# port: 8883
# user: "username"
# password: "secret"
9 changes: 9 additions & 0 deletions gently/hardware/dispim/device_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def create_devices_from_mmcore(core: pymmcore.CMMCore,
- lightsheet_snap: DiSPIMLightSheetSnap
- scanner: DiSPIMScanner
- piezo: DiSPIMPiezo
- fdrive: DiSPIMFDrive (SPIM head Z / F axis)

Example
-------
Expand Down Expand Up @@ -67,6 +68,7 @@ def create_devices_from_mmcore(core: pymmcore.CMMCore,
'camera_name': 'HamCam1',
'scanner_name': 'Scanner:AB:33',
'piezo_name': 'PiezoStage:P:34',
'fdrive_name': 'ZStage:V:37',
'bottom_camera_name': 'Bottom PCO',
'led_name': 'LED:X:31'
}
Expand Down Expand Up @@ -127,6 +129,13 @@ def create_devices_from_mmcore(core: pymmcore.CMMCore,
except Exception as e:
logger.warning("Could not create XY stage: %s", e)

try:
from .devices import DiSPIMFDrive
devices['fdrive'] = DiSPIMFDrive(name=cfg['fdrive_name'], core=core)
logger.info("Created F-drive (SPIM head): %s", cfg['fdrive_name'])
except Exception as e:
logger.warning("Could not create F-drive (SPIM head): %s", e)

try:
if cfg.get('led_name'):
from .devices import DiSPIMLED
Expand Down
19 changes: 19 additions & 0 deletions gently/hardware/dispim/device_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ def __init__(
'focus_sweep_plan',
'calibrate_piezo_galvo_plan',
'multi_embryo_calibration_workflow',
# Slow F-drive traverse (25000 -> ~50 um) + fine focus scan + the
# XY view-registration loop all hold MMCore for a while.
'spim_head_focus_descent_plan',
'register_views_xy_plan',
'spim_head_focus_and_align_plan',
})

async def initialize(self):
Expand Down Expand Up @@ -448,6 +453,20 @@ def _load_plans(self):
except ImportError:
logger.info("Main acquisition plans not available")

# SPIM head focus + dual-view registration plans
try:
from .plans.acquisition import (
spim_head_focus_descent_plan,
register_views_xy_plan,
spim_head_focus_and_align_plan,
)
self.plans['spim_head_focus_descent_plan'] = spim_head_focus_descent_plan
self.plans['register_views_xy_plan'] = register_views_xy_plan
self.plans['spim_head_focus_and_align_plan'] = spim_head_focus_and_align_plan
logger.info("Loaded SPIM head focus plans")
except ImportError:
logger.info("SPIM head focus plans not available")

def _resolve_device_args(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Replace device name strings with actual device objects"""
resolved = {}
Expand Down
64 changes: 54 additions & 10 deletions gently/hardware/dispim/devices/piezo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,80 @@
logger = logging.getLogger(__name__)


# =========================================================================
# SPIM-HEAD F-DRIVE HARDWARE SAFETY LIMITS — absolute MMCore micrometres.
#
# Layer-1 software fence for the SPIM head: the ASI Tiger "ZStage:V:37"
# axis, which the ASIdiSPIM plugin labels "SPIM Head Height" (the F axis).
# This is the drive that LOWERS the objectives into the dish to hunt for
# embryos ("Start Hunting") and RAISES them clear for sample loading
# ("Load Sample"). Every F-drive move planned by any layer above (Bluesky
# plans, agent orchestrators, UI tools) is bounded here. These are NOT
# constructor kwargs and DiSPIMFDrive exposes no setter — no layer above
# can widen them.
#
# F_DRIVE_MIN_UM collision-critical FLOOR. Smaller F drives the head
# DOWN toward the sample/holder; this hard stop keeps the
# objectives off the dish.
# F_DRIVE_MAX_UM fully-raised "Load Sample" ceiling.
#
# Update only after physically verifying head travel on the rig (drive the
# F axis to each extreme by hand, confirm no collision, read the absolute
# MMCore position) and then editing the constants below.
#
# SCOPE: this is a SOFTWARE-ONLY fence. Unlike the XY stage (see
# devices/stage.py, which also pushes ASI Tiger firmware soft-limits) we do
# NOT write these to the controller, so a physical joystick move can still
# drive the head past these bounds — they bind code-issued moves only.
# =========================================================================
F_DRIVE_MIN_UM: float = 30.0
F_DRIVE_MAX_UM: float = 25000.0


class DiSPIMFDrive:
"""
DiSPIM F-drive (SPIM Head motor) - works with bps.mv(fdrive, position)

ASI Tiger V:37 axis - controls F-axis module for lowering objectives
Device-agnostic: any plan that moves a positioner will work with this device
ASI Tiger "ZStage:V:37" axis — the ASIdiSPIM "SPIM Head Height" / F
axis that lowers the objectives to hunt for embryos and raises them to
load a sample. Device-agnostic: any plan that moves a positioner works.

Hard travel bounds are the module-level F_DRIVE_MIN_UM / F_DRIVE_MAX_UM
constants. They are not constructor kwargs and cannot be widened from
above — see the safety-limit note above this class.
"""

def __init__(self, name: str, core: pymmcore.CMMCore,
limits: Tuple[float, float] = (20.0, 25000.0)):
move_timeout_s: float = 120.0):
self.name = name
self.core = core
self.parent = None # Required for Bluesky
self._limits = limits
# Full-travel F moves (e.g. 25000 -> 5000 um: "Load Sample" -> approach)
# are slow; the per-move Status timeout must comfortably exceed the
# longest traverse or bps.mv would error mid-move while the stage is
# still travelling. Configurable for unusually slow controllers.
self._move_timeout_s = float(move_timeout_s)
self.tolerance = 0.1 # µm

@property
def limits(self):
return self._limits
def limits(self) -> Tuple[float, float]:
"""Read-only view of the hardware safety limits (module constants)."""
return (F_DRIVE_MIN_UM, F_DRIVE_MAX_UM)

def set(self, position):
"""Move F-drive to position - called by bps.mv()"""
position = float(position)
position = round(position, 2) # Round to 0.01 μm precision

# Safety check
if not (self._limits[0] <= position <= self._limits[1]):
raise ValueError(f"Position {position} outside limits {self._limits}")
# Hardware safety check — pinned to the module-level F_DRIVE_*_UM
# constants; nothing above this layer can widen them.
if not (F_DRIVE_MIN_UM <= position <= F_DRIVE_MAX_UM):
raise ValueError(
f"F-drive position {position} outside hardware limits "
f"[{F_DRIVE_MIN_UM}, {F_DRIVE_MAX_UM}]"
)

status = Status(obj=self, timeout=30)
status = Status(obj=self, timeout=self._move_timeout_s)

def wait():
try:
Expand Down
90 changes: 90 additions & 0 deletions gently/hardware/dispim/devices/test_temp_usb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import serial
import time
import threading

class AcuityNanoPrecisionThermalizerSerial:
def __init__(self, com_port, baud_rate=115200):
self.telemetry = {
"target": 20.0,
"water": 20.0,
"peltier": 20.0,
"state": "DISCONNECTED",
"errors": "0"
}
self.running = True
self.ser = serial.Serial(com_port, baud_rate, timeout=0.1)
time.sleep(2)

self.thread = threading.Thread(target=self._read_loop, daemon=True)
self.thread.start()

def _read_loop(self):
while self.running and self.ser.is_open:
try:
if self.ser.in_waiting:
line = self.ser.readline().decode('utf-8', errors='ignore').strip()
if "=" in line:
key, val = line.split("=", 1)
if key == "TARGET": self.telemetry["target"] = float(val)
elif key == "WATER": self.telemetry["water"] = float(val)
elif key == "ACTUAL": self.telemetry["peltier"] = float(val)
elif key == "STATE": self.telemetry["state"] = val
elif key == "ERRORS": self.telemetry["errors"] = val
except Exception:
pass
time.sleep(0.01)

def close(self):
self.running = False
if self.ser.is_open:
self.ser.close()

def set_temperature(self, target_celsius):
if 0.0 <= target_celsius <= 99.9:
cmd = f"TEMP={target_celsius}\n"
self.ser.write(cmd.encode('utf-8'))
else:
raise ValueError("Target must be between 0.0 and 99.9 C")

def enable_tec(self, enable=True):
val = "1" if enable else "0"
cmd = f"ENABLE={val}\n"
self.ser.write(cmd.encode('utf-8'))

def set_feedback_sensor(self, use_peltier=False):
val = "1" if use_peltier else "0"
cmd = f"SENSOR={val}\n"
self.ser.write(cmd.encode('utf-8'))

def get_water_temp(self):
return self.telemetry["water"]

def get_system_state(self):
return self.telemetry["state"]

def wait_for_target(self, timeout_seconds=300):
start = time.time()
while time.time() - start < timeout_seconds:
if "[ SYSTEM LOCKED ]" in self.telemetry["state"]:
return True
time.sleep(0.5)
return False

if __name__ == "__main__":
import time

print("Connecting to ACUITYnano...")
acuity = AcuityNanoPrecisionThermalizerSerial("COM8")

print("Commanding 37.0 C...")
acuity.set_temperature(37.0)
acuity.enable_tec(True)

print("Waiting for thermal stabilization...")
if acuity.wait_for_target(timeout_seconds=600):
print(f"System locked at {acuity.get_water_temp()} C!")
# Trigger external camera or syringe pump here
else:
print("Timeout reached before system stabilized.")

acuity.close()
99 changes: 99 additions & 0 deletions gently/hardware/dispim/devices/test_temperature_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
ACUITYnano Third-Party Integration SDK
Provides a clean, object-oriented API for external software automation.
"""
import paho.mqtt.client as mqtt
import time
import threading

class AcuityNanoPrecisionThermalizerAPI:
def __init__(self, broker="d0246aa97d194c9da52a19e6f46063eb.s1.eu.hivemq.cloud", port=8883, user="acuitynano", password="Bg984V!@wfhBrkp"):
self.prefix = "acuitynano_hhmi_shroff_diSPIM_001"
self.telemetry = {
"target": 20.0,
"water": 20.0,
"peltier": 20.0,
"state": "DISCONNECTED",
"errors": "0"
}

try:
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
except AttributeError:
self.client = mqtt.Client()

self.client.username_pw_set(user, password)
self.client.tls_set()
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message

self.thread = threading.Thread(target=self._start_loop, daemon=True)
self.thread.start()
time.sleep(2)

def _start_loop(self):
self.client.connect(broker, port, 60)
self.client.loop_forever()

def _on_connect(self, client, userdata, flags, rc, properties=None):
self.client.subscribe(f"{self.prefix}/telemetry/#")

def _on_message(self, client, userdata, msg):
topic = msg.topic.split("/")[-1]
payload = msg.payload.decode('utf-8')

if topic == "target": self.telemetry["target"] = float(payload)
elif topic == "water": self.telemetry["water"] = float(payload)
elif topic == "actual": self.telemetry["peltier"] = float(payload)
elif topic == "state": self.telemetry["state"] = payload
elif topic == "errors": self.telemetry["errors"] = payload

def set_temperature(self, target_celsius):
if 0.0 <= target_celsius <= 99.9:
self.client.publish(f"{self.prefix}/cmd/temp", str(target_celsius))
else:
raise ValueError("Target must be between 0.0 and 99.9 C")

def enable_tec(self, enable=True):
val = "1" if enable else "0"
self.client.publish(f"{self.prefix}/cmd/enable", val)

def set_feedback_sensor(self, use_peltier=False):
val = "1" if use_peltier else "0"
self.client.publish(f"{self.prefix}/cmd/sensor", val)

def get_water_temp(self):
return self.telemetry["water"]

def get_peltier_temp(self):
return self.telemetry["peltier"]

def get_system_state(self):
return self.telemetry["state"]

def wait_for_target(self, timeout_seconds=300):
start = time.time()
while time.time() - start < timeout_seconds:
if "[ SYSTEM LOCKED ]" in self.telemetry["state"]:
return True
time.sleep(0.5)
return False



if __name__ == "__main__":

import time
from acuitynano_precision_thermalizer_api import AcuityNanoPrecisionThermalizerAPI

acuity = AcuityNanoPrecisionThermalizerAPI()
print("Commanding ACUITYnano to 37.0 C...")
acuity.set_temperature(30.0)
acuity.enable_tec(True)

print("Waiting for thermal stabilization...")
if acuity.wait_for_target(timeout_seconds=600):
print(f"System locked at {acuity.get_water_temp()} C!")
# Trigger image acquisition here
else:
print("Timeout reached.")
Loading