diff --git a/docs/mqtt-thermostat-digital-twin.md b/docs/mqtt-thermostat-digital-twin.md new file mode 100644 index 00000000..cc405582 --- /dev/null +++ b/docs/mqtt-thermostat-digital-twin.md @@ -0,0 +1,59 @@ +# MQTT Thermostat Digital Twin + +`tests/test_temperature_controller.py` includes an opt-in integration check for +the ACUITYnano MQTT thermostat digital twin. It is skipped by default so normal +CI and local development do not require broker credentials or the vendor MQTT +SDK. + +## Configuration + +Set `GENTLY_MQTT_THERMOSTAT_CONFIG` to either a JSON object or the path to a +JSON file. The config is passed to `create_temperature_controller()` with +`backend` defaulting to `mqtt`. + +Example JSON: + +```json +{ + "backend": "mqtt", + "name": "temperature", + "broker": "mqtt.example.org", + "port": 8883, + "user": "gently", + "password": "replace-me" +} +``` + +The vendor package may provide embedded broker defaults. In that case the config +can be as small as: + +```json +{"backend": "mqtt", "name": "temperature"} +``` + +## Environment Variables + +| Variable | Default | Purpose | +| --- | --- | --- | +| `GENTLY_MQTT_THERMOSTAT_CONFIG` | unset | JSON object or path to JSON config. Required to run. | +| `GENTLY_MQTT_THERMOSTAT_TARGET_C` | `21.5` | Commanded setpoint for the test. | +| `GENTLY_MQTT_THERMOSTAT_TIMEOUT_S` | `10` | Seconds to wait for lock or convergence. | +| `GENTLY_MQTT_THERMOSTAT_TOLERANCE_C` | `0.25` | Temperature tolerance for convergence. | + +## Run + +```shell +pytest tests/test_temperature_controller.py -q +``` + +Without `GENTLY_MQTT_THERMOSTAT_CONFIG`, the digital-twin test is skipped. +With the config set, the test: + +1. Creates the MQTT-backed temperature controller. +2. Enables the controller. +3. Commands a non-blocking setpoint. +4. Verifies immediate setpoint readback. +5. Polls until the twin reports lock or measured temperature convergence. + +This checks the runtime path used by `/api/temperature/set` without requiring +live microscope hardware. diff --git a/gently/app/tools/acquisition_tools.py b/gently/app/tools/acquisition_tools.py index 0f429982..f5ed90a6 100644 --- a/gently/app/tools/acquisition_tools.py +++ b/gently/app/tools/acquisition_tools.py @@ -105,10 +105,10 @@ async def acquire_volume( if agent.store and agent.session_id: try: from pathlib import Path as _Path - pos = embryo.stage_position or {} agent.store.register_embryo( agent.session_id, embryo_id, - position_x=pos.get('x'), position_y=pos.get('y'), + position_coarse=embryo.position_coarse or {}, + position_fine=embryo.position_fine or {}, calibration=embryo.calibration, role=embryo.role, ) diff --git a/gently/hardware/dispim/device_layer.py b/gently/hardware/dispim/device_layer.py index 3b50aded..7b6b6862 100644 --- a/gently/hardware/dispim/device_layer.py +++ b/gently/hardware/dispim/device_layer.py @@ -1457,6 +1457,7 @@ async def handle_set_temperature(self, request): 'success': True, 'target_c': target, 'waited': False, 'message': f'commanded {target} C (ramping)', 'temperature_c': r.get(temp.name, {}).get('value'), + 'setpoint_c': r.get(f'{temp.name}_setpoint', {}).get('value'), 'state': r.get(f'{temp.name}_state', {}).get('value'), }) diff --git a/gently/hardware/temperature.py b/gently/hardware/temperature.py index 526d576d..a9f97780 100644 --- a/gently/hardware/temperature.py +++ b/gently/hardware/temperature.py @@ -106,6 +106,7 @@ def worker(): with self._lock: try: self._dev.set_temperature(target) # vendor also validates range + self._setpoint = target self._dev.enable_tec(True) locked = self._dev.wait_for_target(timeout_seconds=self.stabilize_timeout) except Exception as exc: @@ -130,7 +131,12 @@ def enable(self, on: bool = True): def setpoint(self, target_c): """Command the setpoint without blocking for stabilization.""" - self._dev.set_temperature(float(target_c)) + target = float(target_c) + if not (TEMP_MIN_C <= target <= TEMP_MAX_C): + raise ValueError(f"target {target} C outside [{TEMP_MIN_C}, {TEMP_MAX_C}]") + with self._lock: + self._dev.set_temperature(target) + self._setpoint = target # -- Bluesky readable protocol ------------------------------------------- def read(self): diff --git a/gently/harness/conversation.py b/gently/harness/conversation.py index e5de53d9..bfd09684 100644 --- a/gently/harness/conversation.py +++ b/gently/harness/conversation.py @@ -17,6 +17,47 @@ logger = logging.getLogger(__name__) +_TEXT_TOOL_CALL_RE = re.compile(r"\s*(.*?)\s*", re.DOTALL | re.IGNORECASE) + + +def _extract_text_tool_calls(text: str) -> tuple[str, List[Dict[str, Any]]]: + """Extract JSON tool calls embedded in text fallback tags. + + Some model/test harness paths may emit a tool request as text instead of + structured ``tool_use`` blocks. Keep parsing permissive, but only return + well-formed objects that name a tool. + """ + if not text: + return text, [] + + calls: List[Dict[str, Any]] = [] + + def _remove_or_collect(match: re.Match) -> str: + try: + payload = json.loads(match.group(1).strip()) + except (TypeError, json.JSONDecodeError): + return "" + if not isinstance(payload, dict): + return "" + name = payload.get("name") + if not name: + return "" + tool_input = payload.get("input") + if tool_input is None: + tool_input = payload.get("arguments", {}) + if tool_input is None: + tool_input = {} + calls.append({ + "name": name, + "input": tool_input, + "id": payload.get("id"), + }) + return "" + + cleaned = _TEXT_TOOL_CALL_RE.sub(_remove_or_collect, text) + return cleaned, calls + + def _extend_tool_calls(out: List[Dict[str, Any]], content_blocks) -> None: """Append every tool_use block in content_blocks to out. diff --git a/gently/harness/session/manager.py b/gently/harness/session/manager.py index 47ea38d3..9c2b8dd4 100644 --- a/gently/harness/session/manager.py +++ b/gently/harness/session/manager.py @@ -82,13 +82,18 @@ def _resume_session(self, session_id: str, experiment): embryo_states = experiment_data.get('embryos', {}) for embryo_id, embryo_data in embryo_states.items(): - pos = embryo_data.get('stage_position', {}) + position_coarse = embryo_data.get('position_coarse') + position_fine = embryo_data.get('position_fine') + if position_coarse is None and position_fine is None: + position_coarse = embryo_data.get('stage_position', {}) experiment.add_embryo( embryo_id=embryo_id, - position=pos, + position=position_coarse or {}, + position_fine=position_fine or {}, calibration=embryo_data.get('calibration', {}), user_label=embryo_data.get('user_label'), uid=embryo_data.get('uid'), + role=embryo_data.get('role', 'test'), ) embryo = experiment.embryos[embryo_id] embryo.nickname = embryo_data.get('nickname') @@ -107,12 +112,22 @@ def _resume_session(self, session_id: str, experiment): store_embryos = self.store.list_embryos(session_id) for e in store_embryos: eid = e['embryo_id'] - if eid not in experiment.embryos: + calibration = e.get('calibration') or {} + if isinstance(calibration, str): + calibration = json.loads(calibration) if calibration else {} + if eid in experiment.embryos: + embryo = experiment.embryos[eid] + if e.get('position_coarse') is not None: + embryo.position_coarse = e.get('position_coarse') or {} + if e.get('position_fine') is not None: + embryo.position_fine = e.get('position_fine') or {} + else: experiment.add_embryo( embryo_id=eid, position=e.get('position_coarse') or {}, position_fine=e.get('position_fine') or {}, - calibration=json.loads(e['calibration']) if e.get('calibration') else {}, + calibration=calibration, + role=e.get('role', 'test'), ) self.store.touch_session(session_id) @@ -171,14 +186,14 @@ def auto_save(self, experiment, conversation_history, system_prompt): def _sync_embryos_to_db(self, experiment): """Sync in-memory embryo state (positions, calibration) to the DB.""" for embryo_id, embryo in experiment.embryos.items(): - pos = embryo.stage_position or {} self.store.register_embryo( self._session_id, embryo_id, embryo_uid=getattr(embryo, 'uid', None), nickname=getattr(embryo, 'user_label', None), - position_x=pos.get('x'), - position_y=pos.get('y'), + position_coarse=getattr(embryo, 'position_coarse', None) or {}, + position_fine=getattr(embryo, 'position_fine', None) or {}, calibration=embryo.calibration, + role=getattr(embryo, 'role', None), ) def list_sessions(self) -> List[Dict]: diff --git a/gently/mesh/mesh_service.py b/gently/mesh/mesh_service.py index edac242c..c5cbe5b1 100644 --- a/gently/mesh/mesh_service.py +++ b/gently/mesh/mesh_service.py @@ -212,7 +212,7 @@ def _on_peer_discovered(self, data: dict, sender_ip: str, verified: bool = False # Only fetch status from trusted peers if trusted: - asyncio.ensure_future(self._fetch_and_update_peer(peer)) + self._schedule_status_fetch(peer) def _on_peer_heartbeat(self, instance_id: str, sender_ip: str, verified: bool = False): """Called on subsequent heartbeats from a known peer.""" @@ -228,7 +228,7 @@ def _on_nudge_received(self, peer_id: str, sender_ip: str): if peer: peer.last_seen = time.time() peer.ip_address = sender_ip - asyncio.ensure_future(self._fetch_and_update_peer(peer)) + self._schedule_status_fetch(peer) logger.debug(f"Mesh: nudge from {peer.hostname} ({peer_id[:8]}), refetching") def _on_local_status_changed(self, event): @@ -311,6 +311,20 @@ async def _fetch_and_update_peer(self, peer: PeerInfo): "hostname": peer.hostname, }) + def _schedule_status_fetch(self, peer: PeerInfo) -> None: + """Schedule a best-effort peer status fetch when the service is running.""" + if not self._peer_client: + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + logger.debug( + "Mesh: skipping status fetch for %s because no event loop is running", + peer.instance_id[:8], + ) + return + loop.create_task(self._fetch_and_update_peer(peer)) + # ------------------------------------------------------------------ # Pairing integration # ------------------------------------------------------------------ @@ -336,7 +350,7 @@ def mark_peer_trusted(self, instance_id: str): if cert_fp: peer.tls_enabled = True # Kick off an immediate status fetch now that we trust them - asyncio.ensure_future(self._fetch_and_update_peer(peer)) + self._schedule_status_fetch(peer) logger.info(f"Mesh: peer {peer.hostname} ({instance_id[:8]}) now trusted") # ------------------------------------------------------------------ diff --git a/tests/test_campaign_coordination.py b/tests/test_campaign_coordination.py index 18ee49e2..49737fc4 100644 --- a/tests/test_campaign_coordination.py +++ b/tests/test_campaign_coordination.py @@ -15,6 +15,14 @@ import urllib.request import urllib.error +if "pytest" in sys.modules and os.environ.get("GENTLY_RUN_LIVE_CAMPAIGN_TESTS") != "1": + import pytest + + pytest.skip( + "live campaign coordination script requires a running Gently server", + allow_module_level=True, + ) + BASE = os.environ.get("GENTLY_URL", "http://localhost:8080") FAKE_PEER = "test-peer-001" FAKE_HOST = "test-machine" diff --git a/tests/test_dispim_device_safety.py b/tests/test_dispim_device_safety.py index 40fab85a..9ca67e86 100644 --- a/tests/test_dispim_device_safety.py +++ b/tests/test_dispim_device_safety.py @@ -27,7 +27,7 @@ def __init__(self): self._configs = {} # group -> current_config self._available_configs = {} # group -> [configs] self._exposure = 10.0 - self._camera_device = None + self._camera_device = "" self._focus_device = None self._circular_buffer = [] self._sequence_running = False @@ -83,6 +83,9 @@ def waitForConfig(self, group, config): def setCameraDevice(self, name): self._camera_device = name + def getCameraDevice(self): + return self._camera_device + def setFocusDevice(self, name): self._focus_device = name @@ -226,9 +229,9 @@ def make_z_stage(core=None, limits=(50.0, 250.0)): return DiSPIMZstage("ZStage", core or make_core(), limits=limits) -def make_xy_stage(core=None, x_limits=(2000.0, 4000.0), y_limits=(-1000.0, 1000.0)): +def make_xy_stage(core=None): from gently.hardware.dispim.devices.stage import DiSPIMXYStage - return DiSPIMXYStage("XYStage", core or make_core(), x_limits=x_limits, y_limits=y_limits) + return DiSPIMXYStage("XYStage", core or make_core()) def make_piezo(core=None, limits=(-200.0, 200.0)): @@ -318,38 +321,40 @@ class TestXYStageBounds: def test_valid_xy_position(self): stage = make_xy_stage() - status = stage.set([3000.0, 0.0]) + x = (stage.x_limits[0] + stage.x_limits[1]) / 2.0 + y = (stage.y_limits[0] + stage.y_limits[1]) / 2.0 + status = stage.set([x, y]) status.wait(timeout=2) - assert stage.core._xy_position == (3000.0, 0.0) + assert stage.core._xy_position == (x, y) def test_x_below_lower_limit(self): stage = make_xy_stage() - status = stage.set([1999.0, 0.0]) - with pytest.raises(ValueError, match="outside limits"): + status = stage.set([stage.x_limits[0] - 1.0, 0.0]) + with pytest.raises(ValueError, match="outside hardware limits"): status.wait(timeout=2) def test_x_above_upper_limit(self): stage = make_xy_stage() - status = stage.set([4001.0, 0.0]) - with pytest.raises(ValueError, match="outside limits"): + status = stage.set([stage.x_limits[1] + 1.0, 0.0]) + with pytest.raises(ValueError, match="outside hardware limits"): status.wait(timeout=2) def test_y_below_lower_limit(self): stage = make_xy_stage() - status = stage.set([3000.0, -1001.0]) - with pytest.raises(ValueError, match="outside limits"): + status = stage.set([0.0, stage.y_limits[0] - 1.0]) + with pytest.raises(ValueError, match="outside hardware limits"): status.wait(timeout=2) def test_y_above_upper_limit(self): stage = make_xy_stage() - status = stage.set([3000.0, 1001.0]) - with pytest.raises(ValueError, match="outside limits"): + status = stage.set([0.0, stage.y_limits[1] + 1.0]) + with pytest.raises(ValueError, match="outside hardware limits"): status.wait(timeout=2) def test_core_not_called_on_invalid_x(self): core = make_core() stage = make_xy_stage(core=core) - stage.set([0.0, 0.0]) # x=0 is below x_limits[0]=2000 + stage.set([stage.x_limits[0] - 1.0, 0.0]) time.sleep(0.1) assert not any(c[0] == 'setXYPosition' for c in core.call_log) diff --git a/tests/test_session_manager.py b/tests/test_session_manager.py index 02c4f169..f6c15913 100644 --- a/tests/test_session_manager.py +++ b/tests/test_session_manager.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock, patch from gently.harness.session.manager import SessionManager +from gently.harness.state import ExperimentState # =========================================================================== @@ -159,3 +160,45 @@ def test_auto_save_silent_on_error(self): mgr.store.save_session_snapshot.side_effect = Exception("DB error") # Should not raise mgr.auto_save(MagicMock(), [], "prompt") + + def test_sync_embryos_preserves_coarse_and_fine_positions(self): + mgr = self._make_manager() + mgr._session_id = "s1" + experiment = ExperimentState() + experiment.add_embryo( + "e1", + position={"x": 1.0, "y": 2.0}, + position_fine={"x": 1.5, "y": 2.5}, + ) + + mgr._sync_embryos_to_db(experiment) + + kwargs = mgr.store.register_embryo.call_args.kwargs + assert kwargs["position_coarse"] == {"x": 1.0, "y": 2.0} + assert kwargs["position_fine"] == {"x": 1.5, "y": 2.5} + + def test_resume_snapshot_preserves_coarse_and_fine_positions(self): + mgr = self._make_manager() + mgr.store.get_session.return_value = {"session_id": "s1"} + mgr.store.load_session_snapshot.return_value = { + "conversation_history": [], + "experiment_data": { + "embryos": { + "e1": { + "position_coarse": {"x": 1.0, "y": 2.0}, + "position_fine": {"x": 1.5, "y": 2.5}, + "calibration": {}, + "role": "test", + } + } + }, + } + mgr.store.list_embryos.return_value = [] + experiment = ExperimentState() + + ok, _ = mgr._resume_session("s1", experiment) + + assert ok is True + embryo = experiment.embryos["e1"] + assert embryo.position_coarse == {"x": 1.0, "y": 2.0} + assert embryo.position_fine == {"x": 1.5, "y": 2.5} diff --git a/tests/test_temperature_controller.py b/tests/test_temperature_controller.py new file mode 100644 index 00000000..fe22064d --- /dev/null +++ b/tests/test_temperature_controller.py @@ -0,0 +1,92 @@ +import json +import os +import time +from pathlib import Path + +import pytest + +from gently.hardware.temperature import ( + TemperatureController, + _MockBackend, + create_temperature_controller, +) + + +def test_nonblocking_setpoint_updates_reported_setpoint(): + dev = TemperatureController(_MockBackend(), name="temperature") + + dev.enable(True) + dev.setpoint(21.5) + readback = dev.read() + + assert readback["temperature_setpoint"]["value"] == 21.5 + assert readback["temperature"]["value"] == 21.5 + + +def test_nonblocking_setpoint_validates_range(): + dev = TemperatureController(_MockBackend(), name="temperature") + + with pytest.raises(ValueError, match="outside"): + dev.setpoint(120.0) + + +def _mqtt_digital_twin_config(): + raw = os.getenv("GENTLY_MQTT_THERMOSTAT_CONFIG") + if not raw: + pytest.skip( + "set GENTLY_MQTT_THERMOSTAT_CONFIG to a JSON config or config path " + "to run the MQTT thermostat digital-twin test" + ) + + candidate = Path(raw) + if candidate.exists(): + cfg = json.loads(candidate.read_text(encoding="utf-8")) + else: + cfg = json.loads(raw) + cfg.setdefault("backend", "mqtt") + cfg.setdefault("name", "temperature") + return cfg + + +def test_mqtt_digital_twin_reports_commanded_setpoint(): + """Opt-in check for Roland's MQTT thermostat digital twin. + + The test exercises the same non-blocking command path used by + `/api/temperature/set`, but only runs when broker/config details are + supplied explicitly. + """ + cfg = _mqtt_digital_twin_config() + target = float(os.getenv("GENTLY_MQTT_THERMOSTAT_TARGET_C", "21.5")) + timeout_s = float(os.getenv("GENTLY_MQTT_THERMOSTAT_TIMEOUT_S", "10")) + tolerance_c = float(os.getenv("GENTLY_MQTT_THERMOSTAT_TOLERANCE_C", "0.25")) + + try: + dev = create_temperature_controller(cfg) + except ImportError as exc: + pytest.skip(f"MQTT thermostat SDK is unavailable: {exc}") + + try: + dev.enable(True) + dev.setpoint(target) + readback = dev.read() + + assert readback["temperature_setpoint"]["value"] == pytest.approx(target) + + deadline = time.monotonic() + timeout_s + last = readback + while time.monotonic() < deadline: + temp = last["temperature"]["value"] + state = str(last["temperature_state"]["value"]) + if temp is not None and abs(float(temp) - target) <= tolerance_c: + return + if "LOCKED" in state.upper(): + return + time.sleep(0.5) + last = dev.read() + + pytest.fail( + "MQTT thermostat digital twin did not converge or report lock " + f"within {timeout_s:.1f}s; last read={last!r}" + ) + finally: + dev.close()