Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2737,6 +2737,195 @@ async def probe_liquid_volumes(
for container, height in zip(containers, liquid_heights)
]

async def _run_ztouch_on_channel_batch(
self,
batch: ChannelBatch,
tip_lengths: List[float],
z_cavity_bottom: List[float],
z_top: List[float],
channel_speed: float,
channel_acceleration: float,
channel_speed_upwards: float,
detection_limiter_in_PWM: int,
push_down_force_in_PWM: int,
inter_channel_start_delay: float,
n_replicates: int,
) -> Dict[int, List[float]]:
"""Per-batch ztouch probing. Override to substitute simulated sensing.

Returns absolute tip-bottom Z heights (mm, deck frame) keyed by job index
(``batch.indices[i]``) so duplicate channels across batches don't collide.

Channel starts within a batch are staggered by ``inter_channel_start_delay``
seconds (the first channel starts at t=0, the next at t=delay, ...) to
decouple per-channel force detection from cumulative carriage load.
"""
batch_lowest_immers = [z_cavity_bottom[i] for i in batch.indices]
batch_start_pos = [z_top[i] + self.SEARCH_START_CLEARANCE_MM for i in batch.indices]
batch_tip_lens = [tip_lengths[i] for i in batch.indices]

async def _delayed(delay, factory):
if delay > 0:
await asyncio.sleep(delay)
return await factory()

measurements: Dict[int, List[float]] = {orig_idx: [] for orig_idx in batch.indices}

for _ in range(n_replicates):
# post_detection_dist=0 and tip_len passed explicitly so the inner call
# issues no C0 commands (move_channel_z, request_tip_len_on_channel) that
# would serialize the gather. Channel raising is handled by the caller's
# min_traverse_height_during_command / z_position_at_end_of_command.
results = await asyncio.gather(
*[
_delayed(
local_idx * inter_channel_start_delay,
lambda ch=ch, tlen=tlen, lip=lip, sps=sps: self.ztouch_probe_z_height_using_channel(
channel_idx=ch,
tip_len=tlen,
lowest_immers_pos=lip,
start_pos_search=sps,
channel_speed=channel_speed,
channel_acceleration=channel_acceleration,
channel_speed_upwards=channel_speed_upwards,
detection_limiter_in_PWM=detection_limiter_in_PWM,
push_down_force_in_PWM=push_down_force_in_PWM,
post_detection_dist=0,
move_channels_to_safe_pos_after=False,
),
)
for local_idx, (ch, tlen, lip, sps) in enumerate(
zip(batch.channels, batch_tip_lens, batch_lowest_immers, batch_start_pos)
)
]
)

for local_idx, height in enumerate(results):
orig_idx = batch.indices[local_idx]
measurements[orig_idx].append(float(height))

return measurements

async def channels_probe_z_using_ztouch(
self,
containers: List[Container],
use_channels: Optional[List[int]] = None,
resource_offsets: Optional[List[Coordinate]] = None,
channel_speed: float = 10.0,
channel_acceleration: float = 800.0,
channel_speed_upwards: float = 125.0,
detection_limiter_in_PWM: int = 1,
push_down_force_in_PWM: int = 0,
inter_channel_start_delay: float = 0.3,
n_replicates: int = 1,
min_traverse_height_at_beginning_of_command: Optional[float] = None,
min_traverse_height_during_command: Optional[float] = None,
z_position_at_end_of_command: Optional[float] = None,
x_grouping_tolerance: Optional[float] = None,
) -> List[float]:
"""Probe absolute Z-heights at the X/Y of each resource using ztouch (force-sensed) probing.

Drives each channel down at the resource's X/Y until the configured PWM current
limit triggers detection. Works on any deck surface reachable at that X/Y -
well bottoms, plate tops, lids, adapters - the firmware reports the Z of the
first surface the tip touches.

Channels within a batch are started with a small ``inter_channel_start_delay``
stagger so their contact-force transients do not superpose on the shared
carriage. Set to 0.0 for a fully parallel descent.

Uses ``plan_batches`` for X/Y partitioning with per-batch resource spread
(respecting no-go zones), then ``execute_batched`` to iterate batches with
Z safety.

Args:
containers: List of Resource objects whose X/Y defines each probe target.
One per channel.
use_channels: Channel indices to use (0-indexed).
resource_offsets: Optional XYZ offsets from resource centers.
channel_speed: Z-axis search speed in mm/s.
channel_acceleration: Z-drive acceleration in mm/s**2.
channel_speed_upwards: Retraction speed in mm/s.
detection_limiter_in_PWM: PWM current limit for force-trigger detection (0-125).
push_down_force_in_PWM: PWM value for sustained push-down force (0-125, 0 = off).
inter_channel_start_delay: Seconds between successive channel starts within a
batch. 0.0 = fully parallel (no stagger).
n_replicates: Number of measurements per channel.
min_traverse_height_at_beginning_of_command: Absolute Z (mm) before the first batch.
min_traverse_height_during_command: Absolute Z (mm) between batches.
z_position_at_end_of_command: Absolute Z (mm) after probing. None = full Z safety.
x_grouping_tolerance: Resources within this X distance (mm) share a batch.

Returns:
Absolute deck-frame Z (mm) of the surface detected for each input.

Raises:
ValueError: If ``n_replicates`` < 1 or ``inter_channel_start_delay`` < 0.
RuntimeError: If any specified channel lacks a tip.
"""
if n_replicates < 1:
raise ValueError(f"n_replicates must be >= 1, got {n_replicates}.")
if inter_channel_start_delay < 0:
raise ValueError(f"inter_channel_start_delay must be >= 0, got {inter_channel_start_delay}.")

z_cavity_bottom = [
r.get_location_wrt(self.deck, "c", "c", "cavity_bottom").z for r in containers
]
z_top = [r.get_location_wrt(self.deck, "c", "c", "t").z for r in containers]

try:
use_channels, tip_lengths, batches = await self._prepare_batched(
containers=containers,
use_channels=use_channels,
resource_offsets=resource_offsets,
x_grouping_tolerance=x_grouping_tolerance,
min_traverse_height_at_beginning_of_command=min_traverse_height_at_beginning_of_command,
)

batch_results = await self.execute_batched(
func=lambda b: self._run_ztouch_on_channel_batch(
batch=b,
tip_lengths=tip_lengths,
z_cavity_bottom=z_cavity_bottom,
z_top=z_top,
channel_speed=channel_speed,
channel_acceleration=channel_acceleration,
channel_speed_upwards=channel_speed_upwards,
detection_limiter_in_PWM=detection_limiter_in_PWM,
push_down_force_in_PWM=push_down_force_in_PWM,
inter_channel_start_delay=inter_channel_start_delay,
n_replicates=n_replicates,
),
batches=batches,
min_traverse_height_during_command=min_traverse_height_during_command,
)

absolute_heights_measurements: Dict[int, List[float]] = {}
for batch_measurements in batch_results:
for orig_idx, heights in batch_measurements.items():
absolute_heights_measurements.setdefault(orig_idx, []).extend(heights)

# Round to 0.01 mm (firmware z-drive quantum) so users don't see float noise.
absolute_z: List[float] = [
round(
sum(absolute_heights_measurements[idx]) / len(absolute_heights_measurements[idx]),
2,
)
for idx in range(len(use_channels))
]

if z_position_at_end_of_command is not None:
await self.position_channels_in_z_direction(
{ch: z_position_at_end_of_command for ch in use_channels}
)
else:
await self.move_all_channels_in_z_safety()

return absolute_z
except BaseException:
await self.move_all_channels_in_z_safety()
raise

# # # Granular channel control methods # # #

DISPENSING_DRIVE_VOL_LIMIT_BOTTOM = -45 # vol TODO: confirm with others
Expand Down
18 changes: 18 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,21 @@ async def _run_lld_on_channel_batch(
absolute_height = z_cavity_bottom[orig_idx] + container.compute_height_from_volume(volume)
measurements[orig_idx] = [absolute_height] * n_replicates
return measurements

async def _run_ztouch_on_channel_batch(
self,
batch,
tip_lengths: List[float],
z_cavity_bottom: List[float],
z_top: List[float],
channel_speed: float,
channel_acceleration: float,
channel_speed_upwards: float,
detection_limiter_in_PWM: int,
push_down_force_in_PWM: int,
inter_channel_start_delay: float,
n_replicates: int,
) -> Dict[int, List[float]]:
"""Simulate ztouch by returning each target's cavity-bottom Z (the surface a
descending tip would hit first in an empty well). Absolute, not relative."""
return {orig_idx: [z_cavity_bottom[orig_idx]] * n_replicates for orig_idx in batch.indices}
Loading