Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/mqtt-thermostat-digital-twin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# MQTT Thermostat Digital Twin

`tests/test_temperature_controller.py` includes an opt-in integration check for
the ACUITYnano MQTT thermostat digital twin. It is skipped by default so normal
CI and local development do not require broker credentials or the vendor MQTT
SDK.

## Configuration

Set `GENTLY_MQTT_THERMOSTAT_CONFIG` to either a JSON object or the path to a
JSON file. The config is passed to `create_temperature_controller()` with
`backend` defaulting to `mqtt`.

Example JSON:

```json
{
"backend": "mqtt",
"name": "temperature",
"broker": "mqtt.example.org",
"port": 8883,
"user": "gently",
"password": "replace-me"
}
```

The vendor package may provide embedded broker defaults. In that case the config
can be as small as:

```json
{"backend": "mqtt", "name": "temperature"}
```

## Environment Variables

| Variable | Default | Purpose |
| --- | --- | --- |
| `GENTLY_MQTT_THERMOSTAT_CONFIG` | unset | JSON object or path to JSON config. Required to run. |
| `GENTLY_MQTT_THERMOSTAT_TARGET_C` | `21.5` | Commanded setpoint for the test. |
| `GENTLY_MQTT_THERMOSTAT_TIMEOUT_S` | `10` | Seconds to wait for lock or convergence. |
| `GENTLY_MQTT_THERMOSTAT_TOLERANCE_C` | `0.25` | Temperature tolerance for convergence. |

## Run

```shell
pytest tests/test_temperature_controller.py -q
```

Without `GENTLY_MQTT_THERMOSTAT_CONFIG`, the digital-twin test is skipped.
With the config set, the test:

1. Creates the MQTT-backed temperature controller.
2. Enables the controller.
3. Commands a non-blocking setpoint.
4. Verifies immediate setpoint readback.
5. Polls until the twin reports lock or measured temperature convergence.

This checks the runtime path used by `/api/temperature/set` without requiring
live microscope hardware.
4 changes: 2 additions & 2 deletions gently/app/tools/acquisition_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ async def acquire_volume(
if agent.store and agent.session_id:
try:
from pathlib import Path as _Path
pos = embryo.stage_position or {}
agent.store.register_embryo(
agent.session_id, embryo_id,
position_x=pos.get('x'), position_y=pos.get('y'),
position_coarse=embryo.position_coarse or {},
position_fine=embryo.position_fine or {},
calibration=embryo.calibration,
role=embryo.role,
)
Expand Down
1 change: 1 addition & 0 deletions gently/hardware/dispim/device_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ async def handle_set_temperature(self, request):
'success': True, 'target_c': target, 'waited': False,
'message': f'commanded {target} C (ramping)',
'temperature_c': r.get(temp.name, {}).get('value'),
'setpoint_c': r.get(f'{temp.name}_setpoint', {}).get('value'),
'state': r.get(f'{temp.name}_state', {}).get('value'),
})

Expand Down
8 changes: 7 additions & 1 deletion gently/hardware/temperature.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def worker():
with self._lock:
try:
self._dev.set_temperature(target) # vendor also validates range
self._setpoint = target
self._dev.enable_tec(True)
locked = self._dev.wait_for_target(timeout_seconds=self.stabilize_timeout)
except Exception as exc:
Expand All @@ -130,7 +131,12 @@ def enable(self, on: bool = True):

def setpoint(self, target_c):
"""Command the setpoint without blocking for stabilization."""
self._dev.set_temperature(float(target_c))
target = float(target_c)
if not (TEMP_MIN_C <= target <= TEMP_MAX_C):
raise ValueError(f"target {target} C outside [{TEMP_MIN_C}, {TEMP_MAX_C}]")
with self._lock:
self._dev.set_temperature(target)
self._setpoint = target

# -- Bluesky readable protocol -------------------------------------------
def read(self):
Expand Down
41 changes: 41 additions & 0 deletions gently/harness/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,47 @@
logger = logging.getLogger(__name__)


_TEXT_TOOL_CALL_RE = re.compile(r"<tool_call>\s*(.*?)\s*</tool_call>", re.DOTALL | re.IGNORECASE)


def _extract_text_tool_calls(text: str) -> tuple[str, List[Dict[str, Any]]]:
"""Extract JSON tool calls embedded in text fallback tags.

Some model/test harness paths may emit a tool request as text instead of
structured ``tool_use`` blocks. Keep parsing permissive, but only return
well-formed objects that name a tool.
"""
if not text:
return text, []

calls: List[Dict[str, Any]] = []

def _remove_or_collect(match: re.Match) -> str:
try:
payload = json.loads(match.group(1).strip())
except (TypeError, json.JSONDecodeError):
return ""
if not isinstance(payload, dict):
return ""
name = payload.get("name")
if not name:
return ""
tool_input = payload.get("input")
if tool_input is None:
tool_input = payload.get("arguments", {})
if tool_input is None:
tool_input = {}
calls.append({
"name": name,
"input": tool_input,
"id": payload.get("id"),
})
return ""

cleaned = _TEXT_TOOL_CALL_RE.sub(_remove_or_collect, text)
return cleaned, calls


def _extend_tool_calls(out: List[Dict[str, Any]], content_blocks) -> None:
"""Append every tool_use block in content_blocks to out.

Expand Down
29 changes: 22 additions & 7 deletions gently/harness/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ def _resume_session(self, session_id: str, experiment):
embryo_states = experiment_data.get('embryos', {})

for embryo_id, embryo_data in embryo_states.items():
pos = embryo_data.get('stage_position', {})
position_coarse = embryo_data.get('position_coarse')
position_fine = embryo_data.get('position_fine')
if position_coarse is None and position_fine is None:
position_coarse = embryo_data.get('stage_position', {})
experiment.add_embryo(
embryo_id=embryo_id,
position=pos,
position=position_coarse or {},
position_fine=position_fine or {},
calibration=embryo_data.get('calibration', {}),
user_label=embryo_data.get('user_label'),
uid=embryo_data.get('uid'),
role=embryo_data.get('role', 'test'),
)
embryo = experiment.embryos[embryo_id]
embryo.nickname = embryo_data.get('nickname')
Expand All @@ -107,12 +112,22 @@ def _resume_session(self, session_id: str, experiment):
store_embryos = self.store.list_embryos(session_id)
for e in store_embryos:
eid = e['embryo_id']
if eid not in experiment.embryos:
calibration = e.get('calibration') or {}
if isinstance(calibration, str):
calibration = json.loads(calibration) if calibration else {}
if eid in experiment.embryos:
embryo = experiment.embryos[eid]
if e.get('position_coarse') is not None:
embryo.position_coarse = e.get('position_coarse') or {}
if e.get('position_fine') is not None:
embryo.position_fine = e.get('position_fine') or {}
else:
experiment.add_embryo(
embryo_id=eid,
position=e.get('position_coarse') or {},
position_fine=e.get('position_fine') or {},
calibration=json.loads(e['calibration']) if e.get('calibration') else {},
calibration=calibration,
role=e.get('role', 'test'),
)

self.store.touch_session(session_id)
Expand Down Expand Up @@ -171,14 +186,14 @@ def auto_save(self, experiment, conversation_history, system_prompt):
def _sync_embryos_to_db(self, experiment):
"""Sync in-memory embryo state (positions, calibration) to the DB."""
for embryo_id, embryo in experiment.embryos.items():
pos = embryo.stage_position or {}
self.store.register_embryo(
self._session_id, embryo_id,
embryo_uid=getattr(embryo, 'uid', None),
nickname=getattr(embryo, 'user_label', None),
position_x=pos.get('x'),
position_y=pos.get('y'),
position_coarse=getattr(embryo, 'position_coarse', None) or {},
position_fine=getattr(embryo, 'position_fine', None) or {},
calibration=embryo.calibration,
role=getattr(embryo, 'role', None),
)

def list_sessions(self) -> List[Dict]:
Expand Down
20 changes: 17 additions & 3 deletions gently/mesh/mesh_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _on_peer_discovered(self, data: dict, sender_ip: str, verified: bool = False

# Only fetch status from trusted peers
if trusted:
asyncio.ensure_future(self._fetch_and_update_peer(peer))
self._schedule_status_fetch(peer)

def _on_peer_heartbeat(self, instance_id: str, sender_ip: str, verified: bool = False):
"""Called on subsequent heartbeats from a known peer."""
Expand All @@ -228,7 +228,7 @@ def _on_nudge_received(self, peer_id: str, sender_ip: str):
if peer:
peer.last_seen = time.time()
peer.ip_address = sender_ip
asyncio.ensure_future(self._fetch_and_update_peer(peer))
self._schedule_status_fetch(peer)
logger.debug(f"Mesh: nudge from {peer.hostname} ({peer_id[:8]}), refetching")

def _on_local_status_changed(self, event):
Expand Down Expand Up @@ -311,6 +311,20 @@ async def _fetch_and_update_peer(self, peer: PeerInfo):
"hostname": peer.hostname,
})

def _schedule_status_fetch(self, peer: PeerInfo) -> None:
"""Schedule a best-effort peer status fetch when the service is running."""
if not self._peer_client:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
logger.debug(
"Mesh: skipping status fetch for %s because no event loop is running",
peer.instance_id[:8],
)
return
loop.create_task(self._fetch_and_update_peer(peer))

# ------------------------------------------------------------------
# Pairing integration
# ------------------------------------------------------------------
Expand All @@ -336,7 +350,7 @@ def mark_peer_trusted(self, instance_id: str):
if cert_fp:
peer.tls_enabled = True
# Kick off an immediate status fetch now that we trust them
asyncio.ensure_future(self._fetch_and_update_peer(peer))
self._schedule_status_fetch(peer)
logger.info(f"Mesh: peer {peer.hostname} ({instance_id[:8]}) now trusted")

# ------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions tests/test_campaign_coordination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
import urllib.request
import urllib.error

if "pytest" in sys.modules and os.environ.get("GENTLY_RUN_LIVE_CAMPAIGN_TESTS") != "1":
import pytest

pytest.skip(
"live campaign coordination script requires a running Gently server",
allow_module_level=True,
)

BASE = os.environ.get("GENTLY_URL", "http://localhost:8080")
FAKE_PEER = "test-peer-001"
FAKE_HOST = "test-machine"
Expand Down
33 changes: 19 additions & 14 deletions tests/test_dispim_device_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
self._configs = {} # group -> current_config
self._available_configs = {} # group -> [configs]
self._exposure = 10.0
self._camera_device = None
self._camera_device = ""
self._focus_device = None
self._circular_buffer = []
self._sequence_running = False
Expand Down Expand Up @@ -83,6 +83,9 @@ def waitForConfig(self, group, config):
def setCameraDevice(self, name):
self._camera_device = name

def getCameraDevice(self):
return self._camera_device

def setFocusDevice(self, name):
self._focus_device = name

Expand Down Expand Up @@ -226,9 +229,9 @@ def make_z_stage(core=None, limits=(50.0, 250.0)):
return DiSPIMZstage("ZStage", core or make_core(), limits=limits)


def make_xy_stage(core=None, x_limits=(2000.0, 4000.0), y_limits=(-1000.0, 1000.0)):
def make_xy_stage(core=None):
from gently.hardware.dispim.devices.stage import DiSPIMXYStage
return DiSPIMXYStage("XYStage", core or make_core(), x_limits=x_limits, y_limits=y_limits)
return DiSPIMXYStage("XYStage", core or make_core())


def make_piezo(core=None, limits=(-200.0, 200.0)):
Expand Down Expand Up @@ -318,38 +321,40 @@ class TestXYStageBounds:

def test_valid_xy_position(self):
stage = make_xy_stage()
status = stage.set([3000.0, 0.0])
x = (stage.x_limits[0] + stage.x_limits[1]) / 2.0
y = (stage.y_limits[0] + stage.y_limits[1]) / 2.0
status = stage.set([x, y])
status.wait(timeout=2)
assert stage.core._xy_position == (3000.0, 0.0)
assert stage.core._xy_position == (x, y)

def test_x_below_lower_limit(self):
stage = make_xy_stage()
status = stage.set([1999.0, 0.0])
with pytest.raises(ValueError, match="outside limits"):
status = stage.set([stage.x_limits[0] - 1.0, 0.0])
with pytest.raises(ValueError, match="outside hardware limits"):
status.wait(timeout=2)

def test_x_above_upper_limit(self):
stage = make_xy_stage()
status = stage.set([4001.0, 0.0])
with pytest.raises(ValueError, match="outside limits"):
status = stage.set([stage.x_limits[1] + 1.0, 0.0])
with pytest.raises(ValueError, match="outside hardware limits"):
status.wait(timeout=2)

def test_y_below_lower_limit(self):
stage = make_xy_stage()
status = stage.set([3000.0, -1001.0])
with pytest.raises(ValueError, match="outside limits"):
status = stage.set([0.0, stage.y_limits[0] - 1.0])
with pytest.raises(ValueError, match="outside hardware limits"):
status.wait(timeout=2)

def test_y_above_upper_limit(self):
stage = make_xy_stage()
status = stage.set([3000.0, 1001.0])
with pytest.raises(ValueError, match="outside limits"):
status = stage.set([0.0, stage.y_limits[1] + 1.0])
with pytest.raises(ValueError, match="outside hardware limits"):
status.wait(timeout=2)

def test_core_not_called_on_invalid_x(self):
core = make_core()
stage = make_xy_stage(core=core)
stage.set([0.0, 0.0]) # x=0 is below x_limits[0]=2000
stage.set([stage.x_limits[0] - 1.0, 0.0])
time.sleep(0.1)
assert not any(c[0] == 'setXYPosition' for c in core.call_log)

Expand Down
Loading