diff --git a/pylabrobot/brooks/confirmed_firmware_versions.py b/pylabrobot/brooks/confirmed_firmware_versions.py new file mode 100644 index 00000000000..c50a03b0d03 --- /dev/null +++ b/pylabrobot/brooks/confirmed_firmware_versions.py @@ -0,0 +1,102 @@ +"""PreciseFlex software stacks confirmed with this driver. + +This tracks the digital setup only - the GPL firmware, the TCS app, and the loaded +TCS modules - not the physical configuration (links, rail, gripper), which is +discovered into ``PreciseFlexConfiguration``. At setup the driver checks the +discovered stack here: + +- ``SUPPORTED_ROBOT_TYPES`` gates the client-side kinematics. This driver's FK/IK + is the PreciseFlex 400 two-link SCARA geometry, so other models would get + silently-wrong joint targets and are flagged. +- ``CONFIRMED_FIRMWARE_VERSIONS`` is the list of full (model, GPL, TCS, module-set) + software stacks validated against this driver; an unlisted one logs a warning + asking for a report so it can be added. + +(Which modules the driver *requires* lives in ``tcs_modules.py``, a separate concern.) + +Version strings keep the name and version but drop the trailing build date, which +varies by build without changing behaviour. +""" + +import re +from dataclasses import dataclass + +# robot_type (DataID 116) -> model name, for the models whose kinematics this driver implements. +SUPPORTED_ROBOT_TYPES = { + 12: "PreciseFlex 400", +} + + +@dataclass(frozen=True) +class ConfirmedFirmware: + """A full software stack validated against this driver (build dates stripped).""" + + robot_type: int + gpl_version: str # e.g. "GPL 5.1D4" + tcs_version: str # e.g. "TCP Command Server 3.0D4" + modules: tuple # one "name version" per loaded TCS module + + +CONFIRMED_FIRMWARE_VERSIONS = [ + ConfirmedFirmware( + robot_type=12, + gpl_version="GPL 5.1D4", + tcs_version="TCP Command Server 3.0D4", + modules=( + "IntelliGuide 1.0", + "Load-Save Module 3.0B2", + "PARobot Auto Center Module 3.0D3", + "PARobot Module 3.0D4", + "SSGrip Module 3.0D4", + ), + ), +] + +# Trailing build date (e.g. "10-25-2024" or "Apr 25 2025") and anything after it. +_DATE = re.compile(r",?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|[A-Za-z]{3,9}\.?\s+\d{1,2},?\s+\d{4}).*$") + + +def strip_build_date(version: str) -> str: + """Reduce a version string to its name + version, dropping the build date.""" + return _DATE.sub("", version).strip().rstrip(",").strip() + + +def is_supported_model(robot_type: int) -> bool: + """Whether this driver's kinematics cover the given robot_type (DataID 116).""" + return robot_type in SUPPORTED_ROBOT_TYPES + + +def is_confirmed(robot_type: int, gpl_version: str, tcs_version: str, modules: tuple) -> bool: + """Whether the full (model, GPL, TCS, module-set) combination has been validated.""" + target = ( + robot_type, + strip_build_date(gpl_version), + strip_build_date(tcs_version), + tuple(sorted(strip_build_date(m) for m in modules)), + ) + return any( + ( + c.robot_type, + strip_build_date(c.gpl_version), + strip_build_date(c.tcs_version), + tuple(sorted(strip_build_date(m) for m in c.modules)), + ) + == target + for c in CONFIRMED_FIRMWARE_VERSIONS + ) + + +def suggest_entry(robot_type: int, gpl_version: str, tcs_version: str, modules: tuple) -> str: + """Format a discovered stack as a ``ConfirmedFirmware(...)`` literal to paste into + ``CONFIRMED_FIRMWARE_VERSIONS`` (build dates stripped, modules sorted).""" + module_lines = ",\n ".join(f'"{m}"' for m in sorted(strip_build_date(m) for m in modules)) + return ( + " ConfirmedFirmware(\n" + f" robot_type={robot_type},\n" + f' gpl_version="{strip_build_date(gpl_version)}",\n' + f' tcs_version="{strip_build_date(tcs_version)}",\n' + " modules=(\n" + f" {module_lines},\n" + " ),\n" + " )," + ) diff --git a/pylabrobot/brooks/data_ids.py b/pylabrobot/brooks/data_ids.py new file mode 100644 index 00000000000..0322b4ec21a --- /dev/null +++ b/pylabrobot/brooks/data_ids.py @@ -0,0 +1,50 @@ +"""PreciseFlex controller parameter-database identifiers (DataIDs). + +The Guidance controller exposes a large numbered parameter database, read with +the TCS ``pd `` command (wrapped by ``PreciseFlexArmBackend.request_parameter``). +Naming the IDs here keeps configuration reads self-describing rather than bare +numbers, and gives a single place to add more as the database is mapped. +""" + +from enum import IntEnum + + +class DataID(IntEnum): + """Controller parameter-database items, read via ``request_parameter`` (``pd``). + + Each item's reply shape is noted below: a single value, descriptive text, or an + array with one element per axis (parse with ``_parse_per_axis``). + """ + + # Identity / state - single value or descriptive text. + MANUFACTURER = 100 + CONTROLLER_MODEL = 101 + HARDWARE_VERSION = 102 + GPL_VERSION = 103 + CONTROLLER_SERIAL = 110 + ROBOT_TYPE = 116 + POWER_STATE = 234 + ROBOT_HOMED = 2800 # 1 = all axes homed; 0 = not homed (commanded motion blocked) + NUM_AXES = 2000 + ROBOT_NAME = 2002 + AXIS_MASK = 2003 + EXTRA_AXES = 2004 + # Per-axis arrays - one value per joint (e.g. speed differs J1..J5). + REFERENCE_SPEED = 2700 + REFERENCE_ACCEL = 2702 + HARD_LIMIT_MAX = 16075 + HARD_LIMIT_MIN = 16076 + SOFT_LIMIT_MAX = 16077 + SOFT_LIMIT_MIN = 16078 + # Kinematic geometry. LINK_LENGTHS is per-axis (l1 at the shoulder, l2 at the + # elbow); TOOL_OFFSET is a wrist-frame (x, y, z) transform, z = wrist->TCP. + LINK_LENGTHS = 16050 + TOOL_OFFSET = 16051 + # Cartesian reference - (translation, rotation, ...), not per-joint. + REFERENCE_CARTESIAN_SPEED = 2701 + REFERENCE_CARTESIAN_ACCEL = 2703 + # Global motion caps - one percentage applied to the whole profile (all joints + # the same); per-joint maxima come from REFERENCE_* x these. + MAX_SPEED_PERCENT = 2704 + MAX_ACCEL_PERCENT = 2705 + MAX_DECEL_PERCENT = 2706 diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index 759b831f43d..63b75ffdb25 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -8,6 +8,14 @@ from typing import Dict, List, Literal, Optional from pylabrobot.brooks.error_codes import ERROR_CODES +from pylabrobot.brooks.data_ids import DataID +from pylabrobot.brooks.confirmed_firmware_versions import ( + SUPPORTED_ROBOT_TYPES, + is_confirmed, + is_supported_model, + suggest_entry, +) +from pylabrobot.brooks.tcs_modules import missing_required_modules from pylabrobot.brooks import kinematics from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -50,6 +58,102 @@ class PreciseFlexCartesianPose(CartesianPose): wrist: Optional[Wrist] = None +@dataclass(frozen=True) +class WorkingVolume: + """Reachable tool-tip envelope: an annulus about the shoulder, over a Z range (mm).""" + + inner: float + outer: float + zmin: float + zmax: float + + +@dataclass(frozen=True) +class PreciseFlexConfiguration: + """Device configuration resolved once at setup; immutable afterwards. + + The identity/limit/envelope fields are read from the controller (`pd ` + via ``request_parameter`` and the ``version`` command). The kinematics/flags + tier is supplied at construction or derived: link lengths are not on the arm, + ``has_rail`` comes from the joint set, ``is_dual_gripper`` from the axis_mask + ``&H80`` bit, and ``is_vision_gripper``/``reach_class`` from the model name. + """ + + # --- identity / version (DataIDs 100-110, 2002, 116; version command) --- + manufacturer: str + controller_model: str + hardware_version: str + gpl_version: str + controller_serial: str + robot_name: str + robot_type: int + tcs_version: str + modules: tuple + # --- axes / limits / motion envelope --- + num_axes: int + extra_axes: int + axis_mask: int + soft_limits: Dict[Axis, tuple] + hard_limits: Dict[Axis, tuple] + # Effective per-joint maxima (reference x the global percent cap, already applied). + max_joint_speed: Dict[Axis, float] + max_joint_accel: Dict[Axis, float] + max_joint_decel: Dict[Axis, float] + max_cartesian_speed: float + max_cartesian_accel: float + power_state: int + # --- supplied / derived --- + kinematics: "kinematics.PF400Params" = dataclasses.field(default_factory=kinematics.PF400Params) + kinematics_source: Literal["device", "provided", "default"] = "default" + has_rail: bool = False + is_dual_gripper: bool = False + is_vision_gripper: bool = False + reach_class: Literal["standard", "extended"] = "standard" + + @property + def gripper_width_range(self) -> tuple: + return self.soft_limits[Axis.GRIPPER] + + @property + def z_range(self) -> tuple: + return self.soft_limits[Axis.BASE] + + @property + def working_volume(self) -> WorkingVolume: + """Reachable tool-tip annulus, swept from the shoulder/elbow soft limits. + + Sweeps the two planar joints across their soft-limit range (Z held constant - + it is an independent axis on a SCARA), takes the base->wrist radius at each + sample, and brackets it by +/- the tool length (the wrist can orient the tool + radially either way). This respects the joint limits rather than assuming full + extension, so the outer radius is the real reach, not l1 + l2 + tool. + """ + wrist_only = dataclasses.replace(self.kinematics, gripper_length=0.0) + tool = self.kinematics.gripper_length + sh_lo, sh_hi = self.soft_limits[Axis.SHOULDER] + el_lo, el_hi = self.soft_limits[Axis.ELBOW] + steps = 60 + outer, inner = 0.0, float("inf") + for i in range(steps + 1): + shoulder = sh_lo + (sh_hi - sh_lo) * i / steps + for j in range(steps + 1): + elbow = el_lo + (el_hi - el_lo) * j / steps + joints: JointPose = { + Axis.BASE: 0.0, + Axis.SHOULDER: shoulder, + Axis.ELBOW: elbow, + Axis.WRIST: 0.0, + Axis.GRIPPER: 0.0, + Axis.RAIL: 0.0, + } + wrist = kinematics.fk(joints, wrist_only).location + radius = (wrist.x * wrist.x + wrist.y * wrist.y) ** 0.5 + outer = max(outer, radius + tool) + inner = min(inner, abs(radius - tool)) + zmin, zmax = self.z_range + return WorkingVolume(inner=inner, outer=outer, zmin=zmin, zmax=zmax) + + # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- @@ -320,6 +424,28 @@ def _snap_to_current(ik_joints: JointPose, current: JointPose, wrist: Wrist) -> return out +def _parse_scalar(response: str) -> float: + """Parse the first numeric field of a DataID reply. + + Some scalar DataIDs come back zero-padded (e.g. robot type as ``12, 0, 0, ...``) + and Cartesian references carry several components; take the leading value. + """ + return float(response.split(",")[0]) + + +def _parse_per_axis(response: str) -> Dict[Axis, float]: + """Parse a comma-separated per-axis DataID reply into an {Axis: value} map.""" + values = [float(v) for v in response.split(",")] + return {Axis(i + 1): values[i] for i in range(min(len(values), len(Axis)))} + + +def _zip_axis_ranges( + low: Dict[Axis, float], high: Dict[Axis, float] +) -> Dict[Axis, tuple[float, float]]: + """Combine min and max per-axis maps into an {Axis: (min, max)} map.""" + return {axis: (low[axis], high[axis]) for axis in low.keys() & high.keys()} + + class PreciseFlexArmBackend(OrientableGripperArmBackend, HasJoints, CanFreedrive, ABC): """Backend for the PreciseFlex robotic arm. @@ -338,15 +464,27 @@ def __init__( closed_gripper_position: float, is_dual_gripper: bool = False, has_rail: bool = False, + read_kinematics_from_device: bool = True, + recover_out_of_range_at_setup: bool = True, ) -> None: """ Args: - gripper_length: wrist-axis → TCP distance in mm. Depends on the mounted - gripper; the concrete Device wrapper supplies a model-appropriate default - (e.g. 162 mm for the stock single gripper on the PF400). + gripper_length: wrist-axis → TCP distance in mm. Used as the fallback / + override; when ``read_kinematics_from_device`` is True (the default) the + link lengths and tool length are read from the controller at setup and + this value is only used if that read fails. gripper_z_offset: vertical offset in mm from the wrist plate to the tool tip. Depends on the mounted gripper; the concrete Device wrapper supplies a - model-appropriate default. + model-appropriate default. Always taken from here (not on the controller). + read_kinematics_from_device: when True, read l1/l2 and the tool length from + the controller at setup and use them for kinematics; the constructor's + ``gripper_length`` then acts only as a fallback. Set False to force the + constructor values regardless of what the controller reports. + recover_out_of_range_at_setup: when True (the default), setup tries to drive a + small out-of-range excursion back inside the soft limits (slow single-axis move + toward in-range) via ``recover_axes_within_limits``. Set False to skip that. + Either way, setup raises if any axis is still out of range afterward, since the + controller would otherwise reject every commanded move (-1012). closed_gripper_position: firmware-unit value (passed to ``GripClosePos`` / ``GripOpenPos``) at which the jaws are at :attr:`min_gripper_width`. Depends on the mounted gripper. The conversion mm → firmware units is @@ -366,6 +504,10 @@ def __init__( self._kinematics_params = kinematics.PF400Params( gripper_length=gripper_length, gripper_z_offset=gripper_z_offset ) + self._read_kinematics_from_device = read_kinematics_from_device + self._recover_out_of_range_at_setup = recover_out_of_range_at_setup + # Device configuration, resolved once at setup; None until then. + self._configuration: Optional[PreciseFlexConfiguration] = None if is_dual_gripper: warnings.warn( "Dual gripper support is experimental and may not work as expected.", UserWarning @@ -374,22 +516,135 @@ def __init__( async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> None: await super()._on_setup(backend_params=backend_params) await self.stop_freedrive_mode() - # Read the gripper-axis soft limits so width validation reflects this arm - # rather than the hardcoded defaults (the servo gripper's real range differs). + # Resolve the device configuration once and adopt it as the source of truth; + # without it the class defaults stay in place. try: - soft_min = [float(v) for v in (await self.request_parameter(16078)).split(",")] - soft_max = [float(v) for v in (await self.request_parameter(16077)).split(",")] - gripper_index = int(Axis.GRIPPER) - 1 - self._gripper_soft_min = soft_min[gripper_index] - self._gripper_soft_max = soft_max[gripper_index] - self.min_gripper_width = soft_min[gripper_index] - self.max_gripper_width = soft_max[gripper_index] - except Exception as exc: # best-effort; fall back to the class defaults + self._configuration = await self._request_configuration() + except Exception as exc: # discovery is best-effort logger.warning( - "[PreciseFlex %s] could not read gripper soft limits, using defaults: %s", + "[PreciseFlex %s] could not read configuration, using defaults: %s", self.driver.io._host, exc, ) + return + self._adopt_configuration(self._configuration) + self._log_configuration_summary(self._configuration) + self._assess_configuration(self._configuration) + await self._handle_out_of_range_axes() + + def _adopt_configuration(self, config: "PreciseFlexConfiguration") -> None: + """Adopt the discovered configuration as the source of truth for later commands. + + The gripper width limits come from the gripper-axis soft limits, IK/FK use the + device link lengths, and the rail / dual-gripper command paths follow the axes + the controller actually reports. + """ + gmin, gmax = config.gripper_width_range + self._gripper_soft_min, self._gripper_soft_max = gmin, gmax + self.min_gripper_width, self.max_gripper_width = gmin, gmax + self._kinematics_params = config.kinematics + self._has_rail = config.has_rail + self._is_dual_gripper = config.is_dual_gripper + + def _log_configuration_summary(self, config: "PreciseFlexConfiguration") -> None: + """Log a one-line summary of the discovered configuration.""" + logger.info( + "[PreciseFlex %s] robot_type %s, %s axes%s; %s; %s; modules: %s", + self.driver.io._host, + config.robot_type, + config.num_axes, + " + rail" if config.has_rail else "", + config.gpl_version, + config.tcs_version, + ", ".join(config.modules), + ) + + async def _is_robot_homed(self) -> bool: + """Whether all axes are homed (DataID 2800). + + Homing is lost on every power cycle (incremental encoders), and until it is redone + the controller blocks commanded motion (-1021) and reports unreliable positions. + """ + return _parse_scalar(await self.request_parameter(DataID.ROBOT_HOMED)) == 1.0 + + async def _handle_out_of_range_axes(self) -> None: + """Warn about every out-of-range axis, then correct what is recoverable, or raise. + + An axis parked outside its soft limit makes the arm unusable - the controller rejects + every commanded move with -1012. Setup logs the full set first (either way), then, with + ``recover_out_of_range_at_setup`` on (the default), drives each recoverable offender back + into range. If recovery is off or leaves any axis out, setup raises with explicit + recovery steps rather than leaving a dead arm. + + No-op until the robot is homed: an unhomed incremental axis reads a meaningless ~0 + (so the check would false-positive), and the controller blocks the recovery move with + -1021 anyway. Homing is the prerequisite, so the check waits for it. + """ + if not await self._is_robot_homed(): + logger.warning( + "[PreciseFlex %s] robot not homed; skipping the out-of-range check until it is " + "(home() first - unhomed positions are unreliable and commanded moves are blocked).", + self.driver.io._host, + ) + return + + def fmt(axes: Dict[Axis, tuple]) -> str: + return "; ".join( + f"{axis.name} at {value} (soft limit {limit})" for axis, (value, limit) in axes.items() + ) + + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if not outside: + return + logger.warning( + "[PreciseFlex %s] axes out of soft limit at setup: %s", self.driver.io._host, fmt(outside) + ) + if self._recover_out_of_range_at_setup: + await self.recover_axes_within_limits() + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if outside: + raise PreciseFlexError( + -1012, + f"axis outside its soft limit after setup: {fmt(outside)}. The controller rejects all " + f"commanded moves in this state. Recover with recover_axes_within_limits(), or freedrive " + f"the axis back into range manually (required for the wrist, or when an axis is far past " + f"its limit).", + ) + + def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: + """Warn about an unsupported model, a missing TCS module, or an untested combo. + + The kinematics is the PreciseFlex 400 geometry, so a different model would get + wrong joint targets; a missing module (e.g. PARobot) is the usual ``-2805`` + cause; an unlisted full configuration is allowed but flagged for reporting. + """ + host = self.driver.io._host + if not is_supported_model(config.robot_type): + logger.warning( + "[PreciseFlex %s] robot_type %s is not a model this driver's kinematics " + "supports (%s); move_to/working_volume may be wrong.", + host, + config.robot_type, + ", ".join(SUPPORTED_ROBOT_TYPES.values()), + ) + for module, provides, project in missing_required_modules(config.modules): + logger.warning( + "[PreciseFlex %s] the '%s' module (%s) is not loaded; install the '%s' TCS " + "project (obtain it from Brooks Automation) and restart it.", + host, + module, + provides, + project, + ) + if not is_confirmed(config.robot_type, config.gpl_version, config.tcs_version, config.modules): + logger.info( + "[PreciseFlex %s] this software stack has not been tested with this driver. " + "If the arm works correctly, please add the following entry to " + "CONFIRMED_FIRMWARE_VERSIONS in pylabrobot/brooks/confirmed_firmware_versions.py " + "and open a pull request so other users benefit:\n%s", + host, + suggest_entry(config.robot_type, config.gpl_version, config.tcs_version, config.modules), + ) async def _request_state( self, @@ -435,10 +690,10 @@ async def _request_speed(self) -> float: return await self.request_profile_speed(self.profile_index) # Physical jaw range for the PF400 servoed gripper. Overridden at setup from the - # gripper-axis soft limits when readable; these are the fallback defaults. + # gripper-axis soft limits (DataIDs 16078/16077, Axis.GRIPPER) when discoverable. min_gripper_width: float = 60.0 max_gripper_width: float = 145.0 - # Gripper-axis soft limits in firmware units, read at setup; None until then. + # Gripper-axis soft limits (GripOpenPos/GripClosePos units), read at setup; None until then. _gripper_soft_min: Optional[float] = None _gripper_soft_max: Optional[float] = None @@ -469,13 +724,15 @@ async def move_gripper( force_sensing, ) units = self._mm_to_firmware_units(width) - if self._gripper_soft_min is not None and not ( - self._gripper_soft_min <= units <= self._gripper_soft_max + if ( + self._gripper_soft_min is not None + and self._gripper_soft_max is not None + and not (self._gripper_soft_min <= units <= self._gripper_soft_max) ): raise ValueError( - f"gripper width {width} mm maps to firmware units {units:.1f}, outside the " - f"gripper-axis range [{self._gripper_soft_min}, {self._gripper_soft_max}] - " - f"check closed_gripper_position (currently {self.closed_gripper_position})." + f"gripper width {width} mm maps to firmware units {units:.1f}, outside the gripper " + f"axis range [{self._gripper_soft_min}, {self._gripper_soft_max}] - check " + f"closed_gripper_position (currently {self.closed_gripper_position})." ) if force_sensing: await self._set_grip_close_pos(units) @@ -606,8 +863,47 @@ async def move_to_joint_position( await self._set_speed(backend_params.speed_pct) current = await self.request_joint_position() joint_coords = {**current, **position} + self._assert_within_soft_limits(current, joint_coords) await self._move_j(profile_index=self.profile_index, joint_coords=joint_coords) + def _axes_outside_soft_limits(self, joints: JointPose) -> Dict[Axis, tuple]: + """Axes whose value lies outside their soft limit, as ``axis -> (value, (lo, hi))``. + + Iterates the soft-limit set (keyed by :class:`Axis`) and looks each axis up in + ``joints`` so the comparison stays Axis-typed. Empty until the configuration has + been discovered. + """ + if self._configuration is None: + return {} + outside: Dict[Axis, tuple] = {} + for axis, (lo, hi) in self._configuration.soft_limits.items(): + value = joints.get(axis) + if value is not None and not (lo <= value <= hi): + outside[axis] = (value, (lo, hi)) + return outside + + def _assert_within_soft_limits(self, current: JointPose, target: JointPose) -> None: + """Turn the controller's cryptic ``-1012`` into a clear client-side error. + + Two cases block a commanded move: an axis already parked outside its soft limit + (the controller then rejects *every* commanded move until it is recovered), and + a target outside its soft limit (rejected outright). Freedrive can hand-move an + axis past a soft limit, so a taught pose can land outside the commandable + envelope. No-op until the configuration has been discovered. + """ + for axis, (value, limit) in self._axes_outside_soft_limits(current).items(): + raise ValueError( + f"{axis.name} is parked at {value}, outside its soft limit {limit}; the " + f"controller rejects commanded moves while an axis is out of range (-1012). " + f"Homing will not recover it (the rotary axes are absolute); call " + f"recover_axes_within_limits() to drive it back into range, then retry." + ) + for axis, (value, limit) in self._axes_outside_soft_limits(target).items(): + raise ValueError( + f"{axis.name} target {value} is outside its soft limit {limit}; the controller " + f"would reject the move (-1012). Re-teach this pose within the envelope." + ) + async def request_joint_position( self, backend_params: Optional[BackendParams] = None ) -> JointPose: @@ -779,10 +1075,13 @@ async def start_freedrive_mode( free_axes: List of joint indices to free. Use [0] for all axes. """ if free_axes is None: - # Free the always-present positioning axes; add the rail only when fitted - - # freemode on an absent axis returns -2800 on a no-rail arm. + # Default to the positioning axes that exist; include the rail only when + # fitted - freemode on an absent axis returns -2800 on a no-rail arm. The + # cached configuration is the source of truth for the installed axes; fall + # back to the constructor hint before setup has resolved it. + has_rail = self._configuration.has_rail if self._configuration is not None else self._has_rail free_axes = [Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST] - if self._has_rail: + if has_rail: free_axes.append(Axis.RAIL) for axis in free_axes: await self.driver.send_command(f"freemode {axis}") @@ -981,6 +1280,192 @@ async def request_parameter( response = await self.driver.send_command(f"pd {data_id}") return response + @property + def configuration(self) -> "PreciseFlexConfiguration": + """The device configuration resolved at setup. Raises before setup().""" + if self._configuration is None: + raise RuntimeError("Configuration is not available until setup() has run.") + return self._configuration + + async def request_joint_limits(self, hard: bool = False) -> Dict[Axis, tuple[float, float]]: + """Per-axis travel limits as {Axis: (min, max)}. + + Returns the soft limits by default; pass ``hard=True`` for the hard limits. + """ + min_id = DataID.HARD_LIMIT_MIN if hard else DataID.SOFT_LIMIT_MIN + max_id = DataID.HARD_LIMIT_MAX if hard else DataID.SOFT_LIMIT_MAX + return _zip_axis_ranges( + _parse_per_axis(await self.request_parameter(min_id)), + _parse_per_axis(await self.request_parameter(max_id)), + ) + + async def request_reference_speed(self) -> Dict[Axis, float]: + """Per-axis rated speed at 100%; J1/J5 in mm/s, J2-J4 in deg/s.""" + return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_SPEED)) + + async def request_reference_accel(self) -> Dict[Axis, float]: + """Per-axis rated acceleration at 100%.""" + return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_ACCEL)) + + async def request_link_lengths(self) -> tuple[float, float]: + """(l1, l2) SCARA link lengths in mm: shoulder->elbow, elbow->wrist.""" + per_axis = _parse_per_axis(await self.request_parameter(DataID.LINK_LENGTHS)) + return per_axis[Axis.SHOULDER], per_axis[Axis.ELBOW] + + async def request_tool_length(self) -> float: + """Wrist->TCP distance in mm (z of the tool-offset transform).""" + values = [float(v) for v in (await self.request_parameter(DataID.TOOL_OFFSET)).split(",")] + return values[2] + + async def request_kinematic_parameters(self) -> "kinematics.PF400Params": + """Build PF400Params from the controller's stored geometry. + + Link lengths and tool length come from the device; gripper_z_offset is not on + the controller, so it is carried over from the constructor params. + """ + l1, l2 = await self.request_link_lengths() + return dataclasses.replace( + self._kinematics_params, + l1=l1, + l2=l2, + gripper_length=await self.request_tool_length(), + ) + + async def request_reference_cartesian_speed(self) -> float: + """Rated Cartesian (translational) speed at 100%, in mm/s.""" + return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_SPEED)) + + async def request_reference_cartesian_accel(self) -> float: + """Rated Cartesian (translational) acceleration at 100%, in mm/s^2.""" + return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_ACCEL)) + + async def request_max_speed_percent(self) -> float: + """Global cap on the speed percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_SPEED_PERCENT)) + + async def request_max_accel_percent(self) -> float: + """Global cap on the acceleration percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_ACCEL_PERCENT)) + + async def request_max_decel_percent(self) -> float: + """Global cap on the deceleration percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_DECEL_PERCENT)) + + async def request_manufacturer(self) -> str: + return (await self.request_parameter(DataID.MANUFACTURER)).strip() + + async def request_controller_model(self) -> str: + return (await self.request_parameter(DataID.CONTROLLER_MODEL)).strip() + + async def request_hardware_version(self) -> str: + return (await self.request_parameter(DataID.HARDWARE_VERSION)).strip() + + async def request_gpl_version(self) -> str: + """Controller firmware/runtime version (distinct from ``request_version``, the TCS app).""" + return (await self.request_parameter(DataID.GPL_VERSION)).strip() + + async def request_controller_serial(self) -> str: + return (await self.request_parameter(DataID.CONTROLLER_SERIAL)).strip() + + async def request_robot_name(self) -> str: + return (await self.request_parameter(DataID.ROBOT_NAME)).strip() + + async def request_robot_type(self) -> int: + """Built-in kinematic model id (PF400 = 12).""" + return int(_parse_scalar(await self.request_parameter(DataID.ROBOT_TYPE))) + + async def request_axis_count(self) -> int: + """Number of servoed axes.""" + return int(_parse_scalar(await self.request_parameter(DataID.NUM_AXES))) + + async def request_extra_axis_count(self) -> int: + """Number of non-servoed (extra) axes.""" + return int(_parse_scalar(await self.request_parameter(DataID.EXTRA_AXES))) + + async def request_axis_mask(self) -> int: + """Capability/option bit field (rail, dual gripper, ...).""" + return int(_parse_scalar(await self.request_parameter(DataID.AXIS_MASK))) + + async def request_power_state(self) -> int: + """Power / auto-execute state word.""" + return int(_parse_scalar(await self.request_parameter(DataID.POWER_STATE))) + + async def _request_configuration(self) -> "PreciseFlexConfiguration": + """Read the controller's identity, axes, limits, kinematics, and envelope. + + Read-only (no motion, no homing required), so it is safe to call at setup. + Link lengths and tool length are read from the controller; per-arm flags are + derived from the joint set, the axis mask, and the model name. + """ + soft_limits = await self.request_joint_limits() + axis_mask = await self.request_axis_mask() + robot_name = await self.request_robot_name() + name_tokens = robot_name.split() + suffix = name_tokens[-1].upper().lstrip("0123456789") if name_tokens else "" + # The version command reports the TCS app version then its loaded modules. + tcs_version, *modules = (seg.strip() for seg in (await self.request_version()).split(",")) + + # Combine the per-axis 100% references with the global percent caps into the + # effective per-joint maxima, so consumers get usable limits, not raw factors. + reference_speed = await self.request_reference_speed() + reference_accel = await self.request_reference_accel() + speed_pct = await self.request_max_speed_percent() + accel_pct = await self.request_max_accel_percent() + decel_pct = await self.request_max_decel_percent() + + # Kinematics: read the link/tool geometry from the controller by default, so + # the driver is correct for whichever 400 variant is plugged in; fall back to + # the constructor params if the read fails or the override is set. + kinematics_source: Literal["device", "provided", "default"] + if self._read_kinematics_from_device: + try: + kinematic_params = await self.request_kinematic_parameters() + kinematics_source = "device" + except Exception as exc: + logger.warning( + "[PreciseFlex %s] could not read kinematics, using constructor params: %s", + self.driver.io._host, + exc, + ) + kinematic_params = self._kinematics_params + kinematics_source = "default" + else: + kinematic_params = self._kinematics_params + kinematics_source = "provided" + # Classify by reach: the standard 400 has l1+l2 ~= 435 mm, the extended ~= 591. + reach_class: Literal["standard", "extended"] = ( + "extended" if (kinematic_params.l1 + kinematic_params.l2) >= 513 else "standard" + ) + + return PreciseFlexConfiguration( + manufacturer=await self.request_manufacturer(), + controller_model=await self.request_controller_model(), + hardware_version=await self.request_hardware_version(), + gpl_version=await self.request_gpl_version(), + controller_serial=await self.request_controller_serial(), + robot_name=robot_name, + robot_type=await self.request_robot_type(), + tcs_version=tcs_version, + modules=tuple(modules), + num_axes=await self.request_axis_count(), + extra_axes=await self.request_extra_axis_count(), + axis_mask=axis_mask, + soft_limits=soft_limits, + hard_limits=await self.request_joint_limits(hard=True), + max_joint_speed={a: v * speed_pct / 100 for a, v in reference_speed.items()}, + max_joint_accel={a: v * accel_pct / 100 for a, v in reference_accel.items()}, + max_joint_decel={a: v * decel_pct / 100 for a, v in reference_accel.items()}, + max_cartesian_speed=(await self.request_reference_cartesian_speed()) * speed_pct / 100, + max_cartesian_accel=(await self.request_reference_cartesian_accel()) * accel_pct / 100, + power_state=await self.request_power_state(), + kinematics=kinematic_params, + kinematics_source=kinematics_source, + has_rail=Axis.RAIL in soft_limits, + is_dual_gripper=bool(axis_mask & 0x80), + is_vision_gripper=suffix[:1] == "V", + reach_class=reach_class, + ) + async def reset(self, robot_number: int) -> None: """Reset the threads associated with the specified robot. @@ -1559,6 +2044,84 @@ async def _move_j(self, profile_index: int, joint_coords: JointPose) -> None: ) await self.driver.send_command(f"moveJ {profile_index} {angles_str}") + async def _move_one_axis(self, axis: Axis, position: float) -> None: + """Move a single axis to an absolute position (firmware ``MoveOneAxis``). + + Used for recovery: the controller blocks a normal move while an axis is out of + range, but allows a single-axis move heading back into range. Does not wait for + the motion to complete. + """ + await self.driver.send_command(f"MoveOneAxis {int(axis)} {position} {self.profile_index}") + + # Axes auto-recovered when parked out of range, in a deliberately safe order: the + # gripper jaw first (no arm motion), then the Z column (vertical clearance), then + # the rotary links shoulder -> elbow (smallest swept volume last to first). + # The wrist is intentionally absent: rotating it back to +/-180 can self-collide, so + # it needs the other links first driven to minimal clearance from the origin - a + # maneuver not implemented here. The rail (gross lateral travel) is likewise left out. + # TODO: clearance-aware wrist recovery (and rail). An out-of-range wrist or rail is + # left for the setup post-condition to raise on. + _RECOVERY_ORDER = (Axis.GRIPPER, Axis.BASE, Axis.SHOULDER, Axis.ELBOW) + + async def recover_axes_within_limits( + self, speed_pct: float = 20.0, max_distance: Optional[float] = 5.0 + ) -> Dict[Axis, float]: + """Bring out-of-range axes back inside their soft limits, one axis at a time. + + While an axis is outside its soft limit the controller rejects every commanded + coordinated move (-1012), and homing does not help on the absolute rotary axes. + A single-axis move is the documented exception: it may move an axis toward the + in-range region. Each recoverable offender is driven to just inside its nearest + soft limit, slowly, waiting for each to finish, in :attr:`_RECOVERY_ORDER`. + + Args: + speed_pct: Profile speed for the recovery moves (default 20%, deliberately slow). + max_distance: only move an axis that is out of range by at most this much (deg + for the rotary axes, mm for base/gripper). An axis further out is left in place: + a large unattended single-axis sweep risks a collision, so it is left for the + caller to recover manually (e.g. by freedriving). Pass None to move regardless. + + Returns: + The axes moved, as ``axis -> recovered target``. Empty when nothing recoverable + is out of range or the configuration was not discovered. The wrist and rail are + never auto-recovered (see :attr:`_RECOVERY_ORDER`). + """ + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if not outside: + return {} + prior_speed = await self._request_speed() + await self._set_speed(speed_pct) + recovered: Dict[Axis, float] = {} + try: + for axis in self._RECOVERY_ORDER: + if axis not in outside: + continue + value, (lo, hi) = outside[axis] + above = value > hi # which limit is violated; both moves below hinge on this + overshoot = (value - hi) if above else (lo - value) + if max_distance is not None and overshoot > max_distance: + continue # too far out to move unattended; left for the post-condition to raise + # Land just inside the violated limit, toward the in-range region. Clamp the + # 1-unit margin to half the range so the target stays within [lo, hi] even if + # the range is narrower than the margin (degenerate, but keeps direction sound). + margin = min(1.0, (hi - lo) / 2.0) + target = (hi - margin) if above else (lo + margin) + logger.warning( + "[PreciseFlex %s] recovering %s from %s into soft limit [%s, %s] -> %s", + self.driver.io._host, + axis.name, + value, + lo, + hi, + target, + ) + await self._move_one_axis(axis, target) + await self.driver._wait_for_eom() + recovered[axis] = target + finally: + await self._set_speed(prior_speed) # don't leave the profile at the slow recovery speed + return recovered + async def release_brake(self, axis: int) -> None: """Release the axis brake. @@ -1776,6 +2339,7 @@ def __init__( timeout: int = 20, gripper_length: float = 162.0, gripper_z_offset: float = 0.0, + recover_out_of_range_at_setup: bool = True, ) -> None: """ Args: @@ -1786,6 +2350,9 @@ def __init__( matches the stock single gripper on the PF400. gripper_z_offset: vertical offset in mm from the wrist plate to the tool tip. Defaults to 0 mm. + recover_out_of_range_at_setup: when True (the default), setup tries to drive a + small out-of-range excursion back inside the soft limits; set False to skip + that. Either way setup raises if an axis is still out of range afterward. """ driver = PreciseFlexDriver(host=host, port=port, timeout=timeout) super().__init__(driver=driver) @@ -1796,6 +2363,7 @@ def __init__( gripper_length=gripper_length, gripper_z_offset=gripper_z_offset, closed_gripper_position=closed_gripper_position, + recover_out_of_range_at_setup=recover_out_of_range_at_setup, ) self.reference = Resource(name="PreciseFlex400", size_x=200, size_y=200, size_z=200) self.arm = OrientableGripperArm(backend=backend, reference_resource=self.reference) diff --git a/pylabrobot/brooks/precise_flex_tests.py b/pylabrobot/brooks/precise_flex_tests.py index 73fa9496494..98580ac5478 100644 --- a/pylabrobot/brooks/precise_flex_tests.py +++ b/pylabrobot/brooks/precise_flex_tests.py @@ -2,7 +2,7 @@ from typing import Tuple from unittest.mock import AsyncMock, MagicMock -from pylabrobot.brooks.precise_flex import PreciseFlex400Backend +from pylabrobot.brooks.precise_flex import Axis, PreciseFlex400Backend def _make_backend( @@ -73,3 +73,47 @@ def test_mm_to_firmware_units_helper(self): self.assertEqual(self.backend._mm_to_firmware_units(60.0), 500.0) self.assertEqual(self.backend._mm_to_firmware_units(145.0), 585.0) self.assertEqual(self.backend._mm_to_firmware_units(100.0), 540.0) + + +class TestPreciseFlex400OutOfRangeRecovery(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.backend, self.driver = _make_backend() + self.driver._wait_for_eom = AsyncMock() + self.backend._request_speed = AsyncMock(return_value=50.0) + # Minimal stub configuration: only the soft limits the recovery logic reads. + self.backend._configuration = MagicMock( + soft_limits={ + Axis.SHOULDER: (-93.0, 93.0), + Axis.ELBOW: (12.0, 348.0), + Axis.WRIST: (-960.0, 960.0), + } + ) + + def _move_one_axis_cmds(self) -> list[str]: + return [ + c.args[0] + for c in self.driver.send_command.call_args_list + if c.args[0].startswith("MoveOneAxis") + ] + + async def test_recover_moves_offenders_toward_limit_in_order_and_skips_wrist(self): + """Each recoverable offender is driven 1 unit *inside* the violated limit (above-max down, + below-min up), shoulder before elbow per _RECOVERY_ORDER; the wrist is never auto-moved.""" + self.backend.request_joint_position = AsyncMock( + return_value={Axis.SHOULDER: 93.5, Axis.ELBOW: 9.0, Axis.WRIST: 962.0} + ) + recovered = await self.backend.recover_axes_within_limits() + self.assertEqual(recovered, {Axis.SHOULDER: 92.0, Axis.ELBOW: 13.0}) # wrist excluded + cmds = self._move_one_axis_cmds() + self.assertEqual( + cmds, ["MoveOneAxis 2 92.0 1", "MoveOneAxis 3 13.0 1"] + ) # shoulder (2) before elbow (3) + + async def test_recover_skips_axis_too_far_out_of_range(self): + """An axis past its limit by more than max_distance is left in place (no unattended big sweep).""" + self.backend.request_joint_position = AsyncMock( + return_value={Axis.SHOULDER: 120.0} # 27 deg past the 93 limit, beyond the 5 cap + ) + recovered = await self.backend.recover_axes_within_limits() + self.assertEqual(recovered, {}) + self.assertEqual(self._move_one_axis_cmds(), []) diff --git a/pylabrobot/brooks/tcs_modules.py b/pylabrobot/brooks/tcs_modules.py new file mode 100644 index 00000000000..6d064c4ca8e --- /dev/null +++ b/pylabrobot/brooks/tcs_modules.py @@ -0,0 +1,31 @@ +"""TCS modules the PreciseFlex driver requires. + +Derived from the TCS source: every command this driver issues is defined in the base +``Cmd.gpl`` or in ``PARobot.gpl`` - none in ``Load_save.gpl``, ``StereoVision.gpl`` +(IntelliGuide), or the Auto-Center plug-in, so those are not required by this driver. +The base command server is implicit (you cannot connect or run ``version`` without it). + +``PARobot.gpl`` carries two modules and the driver uses commands from both: ``PARobot`` +(gripper open/close, plate handling, freedrive, park, rail, config) and ``SSGrip`` +(``IsFullyClosed``). Both ship in that one file, so installing ``Tcp_cmd_server_pa`` +provides both - but the driver depends on each, so both are checked. + +A missing required module is the usual cause of ``-2805 *Unknown command*``; checking +presence at setup turns that into a clear "install it from Brooks" message up front. +""" + +# required module (name substring) -> (what it provides, project that ships it) +REQUIRED_MODULES = { + "PARobot": ("gripper open/close, plate, freedrive, park, rail", "Tcp_cmd_server_pa"), + "SSGrip": ("gripper closed-state sensor (IsFullyClosed)", "Tcp_cmd_server_pa"), +} + + +def missing_required_modules(modules: tuple) -> list: + """Required modules absent from the loaded set, as (module, provides, project).""" + loaded = " ".join(modules) + return [ + (module, provides, project) + for module, (provides, project) in REQUIRED_MODULES.items() + if module not in loaded + ]