diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index aad432a35ab..6d0f0e72819 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -2737,6 +2737,195 @@ async def probe_liquid_volumes( for container, height in zip(containers, liquid_heights) ] + async def _run_ztouch_on_channel_batch( + self, + batch: ChannelBatch, + tip_lengths: List[float], + z_cavity_bottom: List[float], + z_top: List[float], + channel_speed: float, + channel_acceleration: float, + channel_speed_upwards: float, + detection_limiter_in_PWM: int, + push_down_force_in_PWM: int, + inter_channel_start_delay: float, + n_replicates: int, + ) -> Dict[int, List[float]]: + """Per-batch ztouch probing. Override to substitute simulated sensing. + + Returns absolute tip-bottom Z heights (mm, deck frame) keyed by job index + (``batch.indices[i]``) so duplicate channels across batches don't collide. + + Channel starts within a batch are staggered by ``inter_channel_start_delay`` + seconds (the first channel starts at t=0, the next at t=delay, ...) to + decouple per-channel force detection from cumulative carriage load. + """ + batch_lowest_immers = [z_cavity_bottom[i] for i in batch.indices] + batch_start_pos = [z_top[i] + self.SEARCH_START_CLEARANCE_MM for i in batch.indices] + batch_tip_lens = [tip_lengths[i] for i in batch.indices] + + async def _delayed(delay, factory): + if delay > 0: + await asyncio.sleep(delay) + return await factory() + + measurements: Dict[int, List[float]] = {orig_idx: [] for orig_idx in batch.indices} + + for _ in range(n_replicates): + # post_detection_dist=0 and tip_len passed explicitly so the inner call + # issues no C0 commands (move_channel_z, request_tip_len_on_channel) that + # would serialize the gather. Channel raising is handled by the caller's + # min_traverse_height_during_command / z_position_at_end_of_command. + results = await asyncio.gather( + *[ + _delayed( + local_idx * inter_channel_start_delay, + lambda ch=ch, tlen=tlen, lip=lip, sps=sps: self.ztouch_probe_z_height_using_channel( + channel_idx=ch, + tip_len=tlen, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=channel_speed, + channel_acceleration=channel_acceleration, + channel_speed_upwards=channel_speed_upwards, + detection_limiter_in_PWM=detection_limiter_in_PWM, + push_down_force_in_PWM=push_down_force_in_PWM, + post_detection_dist=0, + move_channels_to_safe_pos_after=False, + ), + ) + for local_idx, (ch, tlen, lip, sps) in enumerate( + zip(batch.channels, batch_tip_lens, batch_lowest_immers, batch_start_pos) + ) + ] + ) + + for local_idx, height in enumerate(results): + orig_idx = batch.indices[local_idx] + measurements[orig_idx].append(float(height)) + + return measurements + + async def channels_probe_z_using_ztouch( + self, + containers: List[Container], + use_channels: Optional[List[int]] = None, + resource_offsets: Optional[List[Coordinate]] = None, + channel_speed: float = 10.0, + channel_acceleration: float = 800.0, + channel_speed_upwards: float = 125.0, + detection_limiter_in_PWM: int = 1, + push_down_force_in_PWM: int = 0, + inter_channel_start_delay: float = 0.3, + n_replicates: int = 1, + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + x_grouping_tolerance: Optional[float] = None, + ) -> List[float]: + """Probe absolute Z-heights at the X/Y of each resource using ztouch (force-sensed) probing. + + Drives each channel down at the resource's X/Y until the configured PWM current + limit triggers detection. Works on any deck surface reachable at that X/Y - + well bottoms, plate tops, lids, adapters - the firmware reports the Z of the + first surface the tip touches. + + Channels within a batch are started with a small ``inter_channel_start_delay`` + stagger so their contact-force transients do not superpose on the shared + carriage. Set to 0.0 for a fully parallel descent. + + Uses ``plan_batches`` for X/Y partitioning with per-batch resource spread + (respecting no-go zones), then ``execute_batched`` to iterate batches with + Z safety. + + Args: + containers: List of Resource objects whose X/Y defines each probe target. + One per channel. + use_channels: Channel indices to use (0-indexed). + resource_offsets: Optional XYZ offsets from resource centers. + channel_speed: Z-axis search speed in mm/s. + channel_acceleration: Z-drive acceleration in mm/s**2. + channel_speed_upwards: Retraction speed in mm/s. + detection_limiter_in_PWM: PWM current limit for force-trigger detection (0-125). + push_down_force_in_PWM: PWM value for sustained push-down force (0-125, 0 = off). + inter_channel_start_delay: Seconds between successive channel starts within a + batch. 0.0 = fully parallel (no stagger). + n_replicates: Number of measurements per channel. + min_traverse_height_at_beginning_of_command: Absolute Z (mm) before the first batch. + min_traverse_height_during_command: Absolute Z (mm) between batches. + z_position_at_end_of_command: Absolute Z (mm) after probing. None = full Z safety. + x_grouping_tolerance: Resources within this X distance (mm) share a batch. + + Returns: + Absolute deck-frame Z (mm) of the surface detected for each input. + + Raises: + ValueError: If ``n_replicates`` < 1 or ``inter_channel_start_delay`` < 0. + RuntimeError: If any specified channel lacks a tip. + """ + if n_replicates < 1: + raise ValueError(f"n_replicates must be >= 1, got {n_replicates}.") + if inter_channel_start_delay < 0: + raise ValueError(f"inter_channel_start_delay must be >= 0, got {inter_channel_start_delay}.") + + z_cavity_bottom = [ + r.get_location_wrt(self.deck, "c", "c", "cavity_bottom").z for r in containers + ] + z_top = [r.get_location_wrt(self.deck, "c", "c", "t").z for r in containers] + + try: + use_channels, tip_lengths, batches = await self._prepare_batched( + containers=containers, + use_channels=use_channels, + resource_offsets=resource_offsets, + x_grouping_tolerance=x_grouping_tolerance, + min_traverse_height_at_beginning_of_command=min_traverse_height_at_beginning_of_command, + ) + + batch_results = await self.execute_batched( + func=lambda b: self._run_ztouch_on_channel_batch( + batch=b, + tip_lengths=tip_lengths, + z_cavity_bottom=z_cavity_bottom, + z_top=z_top, + channel_speed=channel_speed, + channel_acceleration=channel_acceleration, + channel_speed_upwards=channel_speed_upwards, + detection_limiter_in_PWM=detection_limiter_in_PWM, + push_down_force_in_PWM=push_down_force_in_PWM, + inter_channel_start_delay=inter_channel_start_delay, + n_replicates=n_replicates, + ), + batches=batches, + min_traverse_height_during_command=min_traverse_height_during_command, + ) + + absolute_heights_measurements: Dict[int, List[float]] = {} + for batch_measurements in batch_results: + for orig_idx, heights in batch_measurements.items(): + absolute_heights_measurements.setdefault(orig_idx, []).extend(heights) + + # Round to 0.01 mm (firmware z-drive quantum) so users don't see float noise. + absolute_z: List[float] = [ + round( + sum(absolute_heights_measurements[idx]) / len(absolute_heights_measurements[idx]), + 2, + ) + for idx in range(len(use_channels)) + ] + + if z_position_at_end_of_command is not None: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) + else: + await self.move_all_channels_in_z_safety() + + return absolute_z + except BaseException: + await self.move_all_channels_in_z_safety() + raise + # # # Granular channel control methods # # # DISPENSING_DRIVE_VOL_LIMIT_BOTTOM = -45 # vol TODO: confirm with others diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index f75933643ea..05aec0b1899 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -405,3 +405,21 @@ async def _run_lld_on_channel_batch( absolute_height = z_cavity_bottom[orig_idx] + container.compute_height_from_volume(volume) measurements[orig_idx] = [absolute_height] * n_replicates return measurements + + async def _run_ztouch_on_channel_batch( + self, + batch, + tip_lengths: List[float], + z_cavity_bottom: List[float], + z_top: List[float], + channel_speed: float, + channel_acceleration: float, + channel_speed_upwards: float, + detection_limiter_in_PWM: int, + push_down_force_in_PWM: int, + inter_channel_start_delay: float, + n_replicates: int, + ) -> Dict[int, List[float]]: + """Simulate ztouch by returning each target's cavity-bottom Z (the surface a + descending tip would hit first in an empty well). Absolute, not relative.""" + return {orig_idx: [z_cavity_bottom[orig_idx]] * n_replicates for orig_idx in batch.indices}