From 740cc3dcbf709b5aae5df2f6994b8c358fe8ee41 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 4 Jun 2026 21:26:44 +0100 Subject: [PATCH 1/5] v1: Brooks PF400: discover configuration and kinematics from the controller WIP (draft). Reads what the controller already stores instead of hardcoding it: PreciseFlexConfiguration (identity, axes, soft/hard limits, motion envelope) via a named DataID enum, device-read link lengths (16050) and tool transform (16051), derived has_rail / is_dual_gripper / reach_class, and a joint-limit working-envelope sweep. The freedrive and gripper-limit fixes now read from the cached config. Builds on #1072. Co-Authored-By: Claude Opus 4.8 (1M context) --- pylabrobot/brooks/precise_flex.py | 405 ++++++++++++++++++++++++++++-- 1 file changed, 382 insertions(+), 23 deletions(-) diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index 759b831f43d..21c37ae0a33 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -43,6 +43,47 @@ class Axis(IntEnum): RAIL = 6 +class DataID(IntEnum): + """Controller parameter-database items, read via ``request_parameter`` (``pd``). + + Named so configuration reads are self-describing rather than bare numbers. Each + item's reply shape is noted below: a single value, descriptive text, or an + array with one element per axis (parse with ``_parse_per_axis``). + """ + + # Identity / state - single value or descriptive text. + MANUFACTURER = 100 + CONTROLLER_MODEL = 101 + HARDWARE_VERSION = 102 + GPL_VERSION = 103 + CONTROLLER_SERIAL = 110 + ROBOT_TYPE = 116 + POWER_STATE = 234 + NUM_AXES = 2000 + ROBOT_NAME = 2002 + AXIS_MASK = 2003 + EXTRA_AXES = 2004 + # Per-axis arrays - one value per joint (e.g. speed differs J1..J5). + REFERENCE_SPEED = 2700 + REFERENCE_ACCEL = 2702 + HARD_LIMIT_MAX = 16075 + HARD_LIMIT_MIN = 16076 + SOFT_LIMIT_MAX = 16077 + SOFT_LIMIT_MIN = 16078 + # Kinematic geometry. LINK_LENGTHS is per-axis (l1 at the shoulder, l2 at the + # elbow); TOOL_OFFSET is a wrist-frame (x, y, z) transform, z = wrist->TCP. + LINK_LENGTHS = 16050 + TOOL_OFFSET = 16051 + # Cartesian reference - (translation, rotation, ...), not per-joint. + REFERENCE_CARTESIAN_SPEED = 2701 + REFERENCE_CARTESIAN_ACCEL = 2703 + # Global motion caps - one percentage applied to the whole profile (all joints + # the same); per-joint maxima come from REFERENCE_* x these. + MAX_SPEED_PERCENT = 2704 + MAX_ACCEL_PERCENT = 2705 + MAX_DECEL_PERCENT = 2706 + + @dataclass class PreciseFlexCartesianPose(CartesianPose): rail_position: Optional[float] = None @@ -50,6 +91,102 @@ class PreciseFlexCartesianPose(CartesianPose): wrist: Optional[Wrist] = None +@dataclass(frozen=True) +class WorkingVolume: + """Reachable tool-tip envelope: an annulus about the shoulder, over a Z range (mm).""" + + inner: float + outer: float + zmin: float + zmax: float + + +@dataclass(frozen=True) +class PreciseFlexConfiguration: + """Device configuration resolved once at setup; immutable afterwards. + + The identity/limit/envelope fields are read from the controller (`pd ` + via ``request_parameter`` and the ``version`` command). The kinematics/flags + tier is supplied at construction or derived: link lengths are not on the arm, + ``has_rail`` comes from the joint set, ``is_dual_gripper`` from the axis_mask + ``&H80`` bit, and ``is_vision_gripper``/``reach_class`` from the model name. + """ + + # --- identity / version (DataIDs 100-110, 2002, 116; version command) --- + manufacturer: str + controller_model: str + hardware_version: str + gpl_version: str + controller_serial: str + robot_name: str + robot_type: int + tcs_version: str + modules: tuple + # --- axes / limits / motion envelope --- + num_axes: int + extra_axes: int + axis_mask: int + soft_limits: Dict[Axis, tuple] + hard_limits: Dict[Axis, tuple] + # Effective per-joint maxima (reference x the global percent cap, already applied). + max_joint_speed: Dict[Axis, float] + max_joint_accel: Dict[Axis, float] + max_joint_decel: Dict[Axis, float] + max_cartesian_speed: float + max_cartesian_accel: float + power_state: int + # --- supplied / derived --- + kinematics: "kinematics.PF400Params" = dataclasses.field(default_factory=kinematics.PF400Params) + kinematics_source: Literal["device", "provided", "default"] = "default" + has_rail: bool = False + is_dual_gripper: bool = False + is_vision_gripper: bool = False + reach_class: Literal["standard", "extended"] = "standard" + + @property + def gripper_width_range(self) -> tuple: + return self.soft_limits[Axis.GRIPPER] + + @property + def z_range(self) -> tuple: + return self.soft_limits[Axis.BASE] + + @property + def working_volume(self) -> WorkingVolume: + """Reachable tool-tip annulus, swept from the shoulder/elbow soft limits. + + Sweeps the two planar joints across their soft-limit range (Z held constant - + it is an independent axis on a SCARA), takes the base->wrist radius at each + sample, and brackets it by +/- the tool length (the wrist can orient the tool + radially either way). This respects the joint limits rather than assuming full + extension, so the outer radius is the real reach, not l1 + l2 + tool. + """ + wrist_only = dataclasses.replace(self.kinematics, gripper_length=0.0) + tool = self.kinematics.gripper_length + sh_lo, sh_hi = self.soft_limits[Axis.SHOULDER] + el_lo, el_hi = self.soft_limits[Axis.ELBOW] + steps = 60 + outer, inner = 0.0, float("inf") + for i in range(steps + 1): + shoulder = sh_lo + (sh_hi - sh_lo) * i / steps + for j in range(steps + 1): + elbow = el_lo + (el_hi - el_lo) * j / steps + joints = { + Axis.BASE: 0.0, + Axis.SHOULDER: shoulder, + Axis.ELBOW: elbow, + Axis.WRIST: 0.0, + Axis.GRIPPER: 0.0, + Axis.RAIL: 0.0, + } + wrist = kinematics.fk(joints, wrist_only).location + radius = (wrist.x * wrist.x + wrist.y * wrist.y) ** 0.5 + outer = max(outer, radius + tool) + inner = min(inner, abs(radius - tool)) + zmin, zmax = self.z_range + return WorkingVolume(inner=inner, outer=outer, zmin=zmin, zmax=zmax) + + # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- @@ -320,6 +457,28 @@ def _snap_to_current(ik_joints: JointPose, current: JointPose, wrist: Wrist) -> return out +def _parse_scalar(response: str) -> float: + """Parse the first numeric field of a DataID reply. + + Some scalar DataIDs come back zero-padded (e.g. robot type as ``12, 0, 0, ...``) + and Cartesian references carry several components; take the leading value. + """ + return float(response.split(",")[0]) + + +def _parse_per_axis(response: str) -> Dict[Axis, float]: + """Parse a comma-separated per-axis DataID reply into an {Axis: value} map.""" + values = [float(v) for v in response.split(",")] + return {Axis(i + 1): values[i] for i in range(min(len(values), len(Axis)))} + + +def _zip_axis_ranges( + low: Dict[Axis, float], high: Dict[Axis, float] +) -> Dict[Axis, tuple[float, float]]: + """Combine min and max per-axis maps into an {Axis: (min, max)} map.""" + return {axis: (low[axis], high[axis]) for axis in low.keys() & high.keys()} + + class PreciseFlexArmBackend(OrientableGripperArmBackend, HasJoints, CanFreedrive, ABC): """Backend for the PreciseFlex robotic arm. @@ -338,15 +497,21 @@ def __init__( closed_gripper_position: float, is_dual_gripper: bool = False, has_rail: bool = False, + read_kinematics_from_device: bool = True, ) -> None: """ Args: - gripper_length: wrist-axis → TCP distance in mm. Depends on the mounted - gripper; the concrete Device wrapper supplies a model-appropriate default - (e.g. 162 mm for the stock single gripper on the PF400). + gripper_length: wrist-axis → TCP distance in mm. Used as the fallback / + override; when ``read_kinematics_from_device`` is True (the default) the + link lengths and tool length are read from the controller at setup and + this value is only used if that read fails. gripper_z_offset: vertical offset in mm from the wrist plate to the tool tip. Depends on the mounted gripper; the concrete Device wrapper supplies a - model-appropriate default. + model-appropriate default. Always taken from here (not on the controller). + read_kinematics_from_device: when True, read l1/l2 and the tool length from + the controller at setup and use them for kinematics; the constructor's + ``gripper_length`` then acts only as a fallback. Set False to force the + constructor values regardless of what the controller reports. closed_gripper_position: firmware-unit value (passed to ``GripClosePos`` / ``GripOpenPos``) at which the jaws are at :attr:`min_gripper_width`. Depends on the mounted gripper. The conversion mm → firmware units is @@ -366,6 +531,9 @@ def __init__( self._kinematics_params = kinematics.PF400Params( gripper_length=gripper_length, gripper_z_offset=gripper_z_offset ) + self._read_kinematics_from_device = read_kinematics_from_device + # Device configuration, resolved once at setup; None until then. + self._configuration: Optional[PreciseFlexConfiguration] = None if is_dual_gripper: warnings.warn( "Dual gripper support is experimental and may not work as expected.", UserWarning @@ -374,19 +542,24 @@ def __init__( async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> None: await super()._on_setup(backend_params=backend_params) await self.stop_freedrive_mode() - # Read the gripper-axis soft limits so width validation reflects this arm - # rather than the hardcoded defaults (the servo gripper's real range differs). + # Resolve the device configuration once (identity, axes, limits, envelope) and + # cache it. The fixes below read from it instead of re-querying or hardcoding: + # the gripper width limits come from the gripper-axis soft limits, and the + # freedrive default axis set follows the installed axes (rail only when fitted). try: - soft_min = [float(v) for v in (await self.request_parameter(16078)).split(",")] - soft_max = [float(v) for v in (await self.request_parameter(16077)).split(",")] - gripper_index = int(Axis.GRIPPER) - 1 - self._gripper_soft_min = soft_min[gripper_index] - self._gripper_soft_max = soft_max[gripper_index] - self.min_gripper_width = soft_min[gripper_index] - self.max_gripper_width = soft_max[gripper_index] - except Exception as exc: # best-effort; fall back to the class defaults + self._configuration = await self._request_configuration() + gmin, gmax = self._configuration.gripper_width_range + self._gripper_soft_min, self._gripper_soft_max = gmin, gmax + self.min_gripper_width, self.max_gripper_width = gmin, gmax + # Adopt the discovered geometry and topology as the source of truth: IK/FK + # use the device link lengths, and the rail / dual-gripper command paths + # follow what the controller actually reports. + self._kinematics_params = self._configuration.kinematics + self._has_rail = self._configuration.has_rail + self._is_dual_gripper = self._configuration.is_dual_gripper + except Exception as exc: # discovery is best-effort; fall back to the class defaults logger.warning( - "[PreciseFlex %s] could not read gripper soft limits, using defaults: %s", + "[PreciseFlex %s] could not read configuration, using defaults: %s", self.driver.io._host, exc, ) @@ -435,10 +608,10 @@ async def _request_speed(self) -> float: return await self.request_profile_speed(self.profile_index) # Physical jaw range for the PF400 servoed gripper. Overridden at setup from the - # gripper-axis soft limits when readable; these are the fallback defaults. + # gripper-axis soft limits (DataIDs 16078/16077, Axis.GRIPPER) when discoverable. min_gripper_width: float = 60.0 max_gripper_width: float = 145.0 - # Gripper-axis soft limits in firmware units, read at setup; None until then. + # Gripper-axis soft limits (GripOpenPos/GripClosePos units), read at setup; None until then. _gripper_soft_min: Optional[float] = None _gripper_soft_max: Optional[float] = None @@ -473,9 +646,9 @@ async def move_gripper( self._gripper_soft_min <= units <= self._gripper_soft_max ): raise ValueError( - f"gripper width {width} mm maps to firmware units {units:.1f}, outside the " - f"gripper-axis range [{self._gripper_soft_min}, {self._gripper_soft_max}] - " - f"check closed_gripper_position (currently {self.closed_gripper_position})." + f"gripper width {width} mm maps to firmware units {units:.1f}, outside the gripper " + f"axis range [{self._gripper_soft_min}, {self._gripper_soft_max}] - check " + f"closed_gripper_position (currently {self.closed_gripper_position})." ) if force_sensing: await self._set_grip_close_pos(units) @@ -779,10 +952,13 @@ async def start_freedrive_mode( free_axes: List of joint indices to free. Use [0] for all axes. """ if free_axes is None: - # Free the always-present positioning axes; add the rail only when fitted - - # freemode on an absent axis returns -2800 on a no-rail arm. + # Default to the positioning axes that exist; include the rail only when + # fitted - freemode on an absent axis returns -2800 on a no-rail arm. The + # cached configuration is the source of truth for the installed axes; fall + # back to the constructor hint before setup has resolved it. + has_rail = self._configuration.has_rail if self._configuration is not None else self._has_rail free_axes = [Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST] - if self._has_rail: + if has_rail: free_axes.append(Axis.RAIL) for axis in free_axes: await self.driver.send_command(f"freemode {axis}") @@ -981,6 +1157,189 @@ async def request_parameter( response = await self.driver.send_command(f"pd {data_id}") return response + @property + def configuration(self) -> "PreciseFlexConfiguration": + """The device configuration resolved at setup. Raises before setup().""" + if self._configuration is None: + raise RuntimeError("Configuration is not available until setup() has run.") + return self._configuration + + async def request_joint_limits(self, hard: bool = False) -> Dict[Axis, tuple[float, float]]: + """Per-axis travel limits as {Axis: (min, max)}. + + Returns the soft limits by default; pass ``hard=True`` for the hard limits. + """ + min_id = DataID.HARD_LIMIT_MIN if hard else DataID.SOFT_LIMIT_MIN + max_id = DataID.HARD_LIMIT_MAX if hard else DataID.SOFT_LIMIT_MAX + return _zip_axis_ranges( + _parse_per_axis(await self.request_parameter(min_id)), + _parse_per_axis(await self.request_parameter(max_id)), + ) + + async def request_reference_speed(self) -> Dict[Axis, float]: + """Per-axis rated speed at 100%; J1/J5 in mm/s, J2-J4 in deg/s.""" + return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_SPEED)) + + async def request_reference_accel(self) -> Dict[Axis, float]: + """Per-axis rated acceleration at 100%.""" + return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_ACCEL)) + + async def request_link_lengths(self) -> tuple[float, float]: + """(l1, l2) SCARA link lengths in mm: shoulder->elbow, elbow->wrist.""" + per_axis = _parse_per_axis(await self.request_parameter(DataID.LINK_LENGTHS)) + return per_axis[Axis.SHOULDER], per_axis[Axis.ELBOW] + + async def request_tool_length(self) -> float: + """Wrist->TCP distance in mm (z of the tool-offset transform).""" + values = [float(v) for v in (await self.request_parameter(DataID.TOOL_OFFSET)).split(",")] + return values[2] + + async def request_kinematics(self) -> "kinematics.PF400Params": + """Build PF400Params from the controller's stored geometry. + + Link lengths and tool length come from the device; gripper_z_offset is not on + the controller, so it is carried over from the constructor params. + """ + l1, l2 = await self.request_link_lengths() + return dataclasses.replace( + self._kinematics_params, + l1=l1, + l2=l2, + gripper_length=await self.request_tool_length(), + ) + + async def request_reference_cartesian_speed(self) -> float: + """Rated Cartesian (translational) speed at 100%, in mm/s.""" + return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_SPEED)) + + async def request_reference_cartesian_accel(self) -> float: + """Rated Cartesian (translational) acceleration at 100%, in mm/s^2.""" + return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_ACCEL)) + + async def request_max_speed_percent(self) -> float: + """Global cap on the speed percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_SPEED_PERCENT)) + + async def request_max_accel_percent(self) -> float: + """Global cap on the acceleration percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_ACCEL_PERCENT)) + + async def request_max_decel_percent(self) -> float: + """Global cap on the deceleration percentage (one value, applies to all joints).""" + return _parse_scalar(await self.request_parameter(DataID.MAX_DECEL_PERCENT)) + + async def request_manufacturer(self) -> str: + return (await self.request_parameter(DataID.MANUFACTURER)).strip() + + async def request_controller_model(self) -> str: + return (await self.request_parameter(DataID.CONTROLLER_MODEL)).strip() + + async def request_hardware_version(self) -> str: + return (await self.request_parameter(DataID.HARDWARE_VERSION)).strip() + + async def request_gpl_version(self) -> str: + """Controller firmware/runtime version (distinct from ``request_version``, the TCS app).""" + return (await self.request_parameter(DataID.GPL_VERSION)).strip() + + async def request_controller_serial(self) -> str: + return (await self.request_parameter(DataID.CONTROLLER_SERIAL)).strip() + + async def request_robot_name(self) -> str: + return (await self.request_parameter(DataID.ROBOT_NAME)).strip() + + async def request_robot_type(self) -> int: + """Built-in kinematic model id (PF400 = 12).""" + return int(_parse_scalar(await self.request_parameter(DataID.ROBOT_TYPE))) + + async def request_axis_count(self) -> int: + """Number of servoed axes.""" + return int(_parse_scalar(await self.request_parameter(DataID.NUM_AXES))) + + async def request_extra_axis_count(self) -> int: + """Number of non-servoed (extra) axes.""" + return int(_parse_scalar(await self.request_parameter(DataID.EXTRA_AXES))) + + async def request_axis_mask(self) -> int: + """Capability/option bit field (rail, dual gripper, ...).""" + return int(_parse_scalar(await self.request_parameter(DataID.AXIS_MASK))) + + async def request_power_state(self) -> int: + """Power / auto-execute state word.""" + return int(_parse_scalar(await self.request_parameter(DataID.POWER_STATE))) + + async def _request_configuration(self) -> "PreciseFlexConfiguration": + """Read the controller's identity, axes, limits, kinematics, and envelope. + + Read-only (no motion, no homing required), so it is safe to call at setup. + Link lengths and tool length are read from the controller; per-arm flags are + derived from the joint set, the axis mask, and the model name. + """ + soft_limits = await self.request_joint_limits() + axis_mask = await self.request_axis_mask() + robot_name = await self.request_robot_name() + name_tokens = robot_name.split() + suffix = name_tokens[-1].upper().lstrip("0123456789") if name_tokens else "" + # The version command reports the TCS app version then its loaded modules. + tcs_version, *modules = (seg.strip() for seg in (await self.request_version()).split(",")) + + # Combine the per-axis 100% references with the global percent caps into the + # effective per-joint maxima, so consumers get usable limits, not raw factors. + reference_speed = await self.request_reference_speed() + reference_accel = await self.request_reference_accel() + speed_pct = await self.request_max_speed_percent() + accel_pct = await self.request_max_accel_percent() + decel_pct = await self.request_max_decel_percent() + + # Kinematics: read the link/tool geometry from the controller by default, so + # the driver is correct for whichever 400 variant is plugged in; fall back to + # the constructor params if the read fails or the override is set. + if self._read_kinematics_from_device: + try: + kinematic_params = await self.request_kinematics() + kinematics_source = "device" + except Exception as exc: + logger.warning( + "[PreciseFlex %s] could not read kinematics, using constructor params: %s", + self.driver.io._host, + exc, + ) + kinematic_params = self._kinematics_params + kinematics_source = "default" + else: + kinematic_params = self._kinematics_params + kinematics_source = "provided" + # Classify by reach: the standard 400 has l1+l2 ~= 435 mm, the extended ~= 591. + reach_class = "extended" if (kinematic_params.l1 + kinematic_params.l2) >= 513 else "standard" + + return PreciseFlexConfiguration( + manufacturer=await self.request_manufacturer(), + controller_model=await self.request_controller_model(), + hardware_version=await self.request_hardware_version(), + gpl_version=await self.request_gpl_version(), + controller_serial=await self.request_controller_serial(), + robot_name=robot_name, + robot_type=await self.request_robot_type(), + tcs_version=tcs_version, + modules=tuple(modules), + num_axes=await self.request_axis_count(), + extra_axes=await self.request_extra_axis_count(), + axis_mask=axis_mask, + soft_limits=soft_limits, + hard_limits=await self.request_joint_limits(hard=True), + max_joint_speed={a: v * speed_pct / 100 for a, v in reference_speed.items()}, + max_joint_accel={a: v * accel_pct / 100 for a, v in reference_accel.items()}, + max_joint_decel={a: v * decel_pct / 100 for a, v in reference_accel.items()}, + max_cartesian_speed=(await self.request_reference_cartesian_speed()) * speed_pct / 100, + max_cartesian_accel=(await self.request_reference_cartesian_accel()) * accel_pct / 100, + power_state=await self.request_power_state(), + kinematics=kinematic_params, + kinematics_source=kinematics_source, + has_rail=Axis.RAIL in soft_limits, + is_dual_gripper=bool(axis_mask & 0x80), + is_vision_gripper=suffix[:1] == "V", + reach_class=reach_class, + ) + async def reset(self, robot_number: int) -> None: """Reset the threads associated with the specified robot. From 5fa2870647b5664b65d28786274d180a098a406c Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 4 Jun 2026 21:44:36 +0100 Subject: [PATCH 2/5] v1: Brooks PF400: move DataID enum to its own module Pure move of the parameter-database DataID enum into pylabrobot/brooks/data_ids.py; precise_flex.py imports it. No behaviour change; gives one place to add more IDs. Co-Authored-By: Claude Opus 4.8 (1M context) --- pylabrobot/brooks/data_ids.py | 49 +++++++++++++++++++++++++++++++ pylabrobot/brooks/precise_flex.py | 42 +------------------------- 2 files changed, 50 insertions(+), 41 deletions(-) create mode 100644 pylabrobot/brooks/data_ids.py diff --git a/pylabrobot/brooks/data_ids.py b/pylabrobot/brooks/data_ids.py new file mode 100644 index 00000000000..90b57f9f975 --- /dev/null +++ b/pylabrobot/brooks/data_ids.py @@ -0,0 +1,49 @@ +"""PreciseFlex controller parameter-database identifiers (DataIDs). + +The Guidance controller exposes a large numbered parameter database, read with +the TCS ``pd `` command (wrapped by ``PreciseFlexArmBackend.request_parameter``). +Naming the IDs here keeps configuration reads self-describing rather than bare +numbers, and gives a single place to add more as the database is mapped. +""" + +from enum import IntEnum + + +class DataID(IntEnum): + """Controller parameter-database items, read via ``request_parameter`` (``pd``). + + Each item's reply shape is noted below: a single value, descriptive text, or an + array with one element per axis (parse with ``_parse_per_axis``). + """ + + # Identity / state - single value or descriptive text. + MANUFACTURER = 100 + CONTROLLER_MODEL = 101 + HARDWARE_VERSION = 102 + GPL_VERSION = 103 + CONTROLLER_SERIAL = 110 + ROBOT_TYPE = 116 + POWER_STATE = 234 + NUM_AXES = 2000 + ROBOT_NAME = 2002 + AXIS_MASK = 2003 + EXTRA_AXES = 2004 + # Per-axis arrays - one value per joint (e.g. speed differs J1..J5). + REFERENCE_SPEED = 2700 + REFERENCE_ACCEL = 2702 + HARD_LIMIT_MAX = 16075 + HARD_LIMIT_MIN = 16076 + SOFT_LIMIT_MAX = 16077 + SOFT_LIMIT_MIN = 16078 + # Kinematic geometry. LINK_LENGTHS is per-axis (l1 at the shoulder, l2 at the + # elbow); TOOL_OFFSET is a wrist-frame (x, y, z) transform, z = wrist->TCP. + LINK_LENGTHS = 16050 + TOOL_OFFSET = 16051 + # Cartesian reference - (translation, rotation, ...), not per-joint. + REFERENCE_CARTESIAN_SPEED = 2701 + REFERENCE_CARTESIAN_ACCEL = 2703 + # Global motion caps - one percentage applied to the whole profile (all joints + # the same); per-joint maxima come from REFERENCE_* x these. + MAX_SPEED_PERCENT = 2704 + MAX_ACCEL_PERCENT = 2705 + MAX_DECEL_PERCENT = 2706 diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index 21c37ae0a33..522e1ed5b1c 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -8,6 +8,7 @@ from typing import Dict, List, Literal, Optional from pylabrobot.brooks.error_codes import ERROR_CODES +from pylabrobot.brooks.data_ids import DataID from pylabrobot.brooks import kinematics from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -43,47 +44,6 @@ class Axis(IntEnum): RAIL = 6 -class DataID(IntEnum): - """Controller parameter-database items, read via ``request_parameter`` (``pd``). - - Named so configuration reads are self-describing rather than bare numbers. Each - item's reply shape is noted below: a single value, descriptive text, or an - array with one element per axis (parse with ``_parse_per_axis``). - """ - - # Identity / state - single value or descriptive text. - MANUFACTURER = 100 - CONTROLLER_MODEL = 101 - HARDWARE_VERSION = 102 - GPL_VERSION = 103 - CONTROLLER_SERIAL = 110 - ROBOT_TYPE = 116 - POWER_STATE = 234 - NUM_AXES = 2000 - ROBOT_NAME = 2002 - AXIS_MASK = 2003 - EXTRA_AXES = 2004 - # Per-axis arrays - one value per joint (e.g. speed differs J1..J5). - REFERENCE_SPEED = 2700 - REFERENCE_ACCEL = 2702 - HARD_LIMIT_MAX = 16075 - HARD_LIMIT_MIN = 16076 - SOFT_LIMIT_MAX = 16077 - SOFT_LIMIT_MIN = 16078 - # Kinematic geometry. LINK_LENGTHS is per-axis (l1 at the shoulder, l2 at the - # elbow); TOOL_OFFSET is a wrist-frame (x, y, z) transform, z = wrist->TCP. - LINK_LENGTHS = 16050 - TOOL_OFFSET = 16051 - # Cartesian reference - (translation, rotation, ...), not per-joint. - REFERENCE_CARTESIAN_SPEED = 2701 - REFERENCE_CARTESIAN_ACCEL = 2703 - # Global motion caps - one percentage applied to the whole profile (all joints - # the same); per-joint maxima come from REFERENCE_* x these. - MAX_SPEED_PERCENT = 2704 - MAX_ACCEL_PERCENT = 2705 - MAX_DECEL_PERCENT = 2706 - - @dataclass class PreciseFlexCartesianPose(CartesianPose): rail_position: Optional[float] = None From 5ea9431067dc08fe16f901352bb6e4b9f866a7c3 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 5 Jun 2026 10:07:27 +0100 Subject: [PATCH 3/5] v1: Brooks PF400: confirmed-firmware-versions tracker, model scope gate, contribution prompt Adds pylabrobot/brooks/confirmed_firmware_versions.py: the supported robot models for the client-side kinematics, the validated full software stacks (model + GPL + TCS + loaded-module set, build dates stripped), a capability -> required-module map, and a suggest_entry() that formats a ready-to-paste ConfirmedFirmware literal. _on_setup assesses the discovered stack: it warns on an unsupported model or a missing TCS module (naming the project to obtain from Brooks - the usual -2805 cause), and logs an info-level prompt to contribute an untested-but-working stack. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../brooks/confirmed_firmware_versions.py | 119 ++++++++++++++++++ pylabrobot/brooks/precise_flex.py | 43 +++++++ 2 files changed, 162 insertions(+) create mode 100644 pylabrobot/brooks/confirmed_firmware_versions.py diff --git a/pylabrobot/brooks/confirmed_firmware_versions.py b/pylabrobot/brooks/confirmed_firmware_versions.py new file mode 100644 index 00000000000..1b80f45f2dd --- /dev/null +++ b/pylabrobot/brooks/confirmed_firmware_versions.py @@ -0,0 +1,119 @@ +"""PreciseFlex software stacks confirmed with this driver. + +This tracks the digital setup only - the GPL firmware, the TCS app, and the loaded +TCS modules - not the physical configuration (links, rail, gripper), which is +discovered into ``PreciseFlexConfiguration``. At setup the driver checks the +discovered stack here: + +- ``SUPPORTED_ROBOT_TYPES`` gates the client-side kinematics. This driver's FK/IK + is the PreciseFlex 400 two-link SCARA geometry, so other models would get + silently-wrong joint targets and are flagged. +- ``CONFIRMED_FIRMWARE_VERSIONS`` is the list of full (model, GPL, TCS, module-set) + software stacks validated against this driver; an unlisted one logs a warning + asking for a report so it can be added. +- ``REQUIRED_MODULES`` maps a capability to the TCS module that provides it and + the project that ships it, so a missing module gives a clear "install it from + Brooks" message instead of a late ``-2805 *Unknown command*``. + +Version strings keep the name and version but drop the trailing build date, which +varies by build without changing behaviour. +""" + +import re +from dataclasses import dataclass + +# robot_type (DataID 116) -> model name, for the models whose kinematics this driver implements. +SUPPORTED_ROBOT_TYPES = { + 12: "PreciseFlex 400", +} + + +@dataclass(frozen=True) +class ConfirmedFirmware: + """A full software stack validated against this driver (build dates stripped).""" + + robot_type: int + gpl_version: str # e.g. "GPL 5.1D4" + tcs_version: str # e.g. "TCP Command Server 3.0D4" + modules: tuple # one "name version" per loaded TCS module + + +CONFIRMED_FIRMWARE_VERSIONS = [ + ConfirmedFirmware( + robot_type=12, + gpl_version="GPL 5.1D4", + tcs_version="TCP Command Server 3.0D4", + modules=( + "IntelliGuide 1.0", + "Load-Save Module 3.0B2", + "PARobot Auto Center Module 3.0D3", + "PARobot Module 3.0D4", + "SSGrip Module 3.0D4", + ), + ), +] + +# capability -> (module-name substring that provides it, project that ships it). +# A loaded-module set missing one of these fails the corresponding commands. +REQUIRED_MODULES = { + "gripper / plate handling": ("PARobot", "Tcp_cmd_server_pa"), +} + +# Trailing build date (e.g. "10-25-2024" or "Apr 25 2025") and anything after it. +_DATE = re.compile(r",?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|[A-Za-z]{3,9}\.?\s+\d{1,2},?\s+\d{4}).*$") + + +def strip_build_date(version: str) -> str: + """Reduce a version string to its name + version, dropping the build date.""" + return _DATE.sub("", version).strip().rstrip(",").strip() + + +def is_supported_model(robot_type: int) -> bool: + """Whether this driver's kinematics cover the given robot_type (DataID 116).""" + return robot_type in SUPPORTED_ROBOT_TYPES + + +def is_confirmed(robot_type: int, gpl_version: str, tcs_version: str, modules: tuple) -> bool: + """Whether the full (model, GPL, TCS, module-set) combination has been validated.""" + target = ( + robot_type, + strip_build_date(gpl_version), + strip_build_date(tcs_version), + tuple(sorted(strip_build_date(m) for m in modules)), + ) + return any( + ( + c.robot_type, + strip_build_date(c.gpl_version), + strip_build_date(c.tcs_version), + tuple(sorted(strip_build_date(m) for m in c.modules)), + ) + == target + for c in CONFIRMED_FIRMWARE_VERSIONS + ) + + +def missing_required_modules(modules: tuple) -> list: + """Required modules absent from the loaded set, as (capability, module, project).""" + loaded = " ".join(modules) + return [ + (capability, module, project) + for capability, (module, project) in REQUIRED_MODULES.items() + if module not in loaded + ] + + +def suggest_entry(robot_type: int, gpl_version: str, tcs_version: str, modules: tuple) -> str: + """Format a discovered stack as a ``ConfirmedFirmware(...)`` literal to paste into + ``CONFIRMED_FIRMWARE_VERSIONS`` (build dates stripped, modules sorted).""" + module_lines = ",\n ".join(f'"{m}"' for m in sorted(strip_build_date(m) for m in modules)) + return ( + " ConfirmedFirmware(\n" + f" robot_type={robot_type},\n" + f' gpl_version="{strip_build_date(gpl_version)}",\n' + f' tcs_version="{strip_build_date(tcs_version)}",\n' + " modules=(\n" + f" {module_lines},\n" + " ),\n" + " )," + ) diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index 522e1ed5b1c..24ad9deca4c 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -9,6 +9,13 @@ from pylabrobot.brooks.error_codes import ERROR_CODES from pylabrobot.brooks.data_ids import DataID +from pylabrobot.brooks.confirmed_firmware_versions import ( + SUPPORTED_ROBOT_TYPES, + is_confirmed, + is_supported_model, + missing_required_modules, + suggest_entry, +) from pylabrobot.brooks import kinematics from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -517,6 +524,7 @@ async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> Non self._kinematics_params = self._configuration.kinematics self._has_rail = self._configuration.has_rail self._is_dual_gripper = self._configuration.is_dual_gripper + self._assess_configuration(self._configuration) except Exception as exc: # discovery is best-effort; fall back to the class defaults logger.warning( "[PreciseFlex %s] could not read configuration, using defaults: %s", @@ -524,6 +532,41 @@ async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> Non exc, ) + def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: + """Warn about an unsupported model, a missing TCS module, or an untested combo. + + The kinematics is the PreciseFlex 400 geometry, so a different model would get + wrong joint targets; a missing module (e.g. PARobot) is the usual ``-2805`` + cause; an unlisted full configuration is allowed but flagged for reporting. + """ + host = self.driver.io._host + if not is_supported_model(config.robot_type): + logger.warning( + "[PreciseFlex %s] robot_type %s is not a model this driver's kinematics " + "supports (%s); move_to/working_volume may be wrong.", + host, + config.robot_type, + ", ".join(SUPPORTED_ROBOT_TYPES.values()), + ) + for capability, module, project in missing_required_modules(config.modules): + logger.warning( + "[PreciseFlex %s] the '%s' module needed for %s is not loaded; install the " + "'%s' TCS project (obtain it from Brooks Automation) and restart it.", + host, + module, + capability, + project, + ) + if not is_confirmed(config.robot_type, config.gpl_version, config.tcs_version, config.modules): + logger.info( + "[PreciseFlex %s] this software stack has not been tested with this driver. " + "If the arm works correctly, please add the following entry to " + "CONFIRMED_FIRMWARE_VERSIONS in pylabrobot/brooks/confirmed_firmware_versions.py " + "and open a pull request so other users benefit:\n%s", + host, + suggest_entry(config.robot_type, config.gpl_version, config.tcs_version, config.modules), + ) + async def _request_state( self, ) -> tuple[JointPose, PreciseFlexCartesianPose]: From ba8b83f0f59cbeabbe0686246154ca1bca88e56a Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 5 Jun 2026 15:39:06 +0100 Subject: [PATCH 4/5] v1: Brooks PF400: recover axes parked outside soft limits at setup - recover_axes_within_limits(): single-axis MoveOneAxis back toward the nearest soft limit, in a safe order (gripper, base, shoulder, elbow) and capped to small excursions; wrist and rail excluded (clearance-aware wrist recovery is a TODO). - Setup gains recover_out_of_range_at_setup (default on): logs every out-of-range axis, recovers the recoverable ones, otherwise raises -1012 with recovery steps, so a dead arm never connects silently. - _axes_outside_soft_limits / _assert_within_soft_limits give commanded moves a clear client-side error instead of a bare -1012. - Relocate required-TCS-module tracking to tcs_modules.py; split _on_setup into named helpers; rename request_kinematics -> request_kinematic_parameters. - Type axis identity as Axis at the brooks layer, tighten Optional/Literal annotations, and add a recovery unit test. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../brooks/confirmed_firmware_versions.py | 21 +- pylabrobot/brooks/precise_flex.py | 239 ++++++++++++++++-- pylabrobot/brooks/precise_flex_tests.py | 46 +++- pylabrobot/brooks/tcs_modules.py | 31 +++ 4 files changed, 291 insertions(+), 46 deletions(-) create mode 100644 pylabrobot/brooks/tcs_modules.py diff --git a/pylabrobot/brooks/confirmed_firmware_versions.py b/pylabrobot/brooks/confirmed_firmware_versions.py index 1b80f45f2dd..c50a03b0d03 100644 --- a/pylabrobot/brooks/confirmed_firmware_versions.py +++ b/pylabrobot/brooks/confirmed_firmware_versions.py @@ -11,9 +11,8 @@ - ``CONFIRMED_FIRMWARE_VERSIONS`` is the list of full (model, GPL, TCS, module-set) software stacks validated against this driver; an unlisted one logs a warning asking for a report so it can be added. -- ``REQUIRED_MODULES`` maps a capability to the TCS module that provides it and - the project that ships it, so a missing module gives a clear "install it from - Brooks" message instead of a late ``-2805 *Unknown command*``. + +(Which modules the driver *requires* lives in ``tcs_modules.py``, a separate concern.) Version strings keep the name and version but drop the trailing build date, which varies by build without changing behaviour. @@ -53,12 +52,6 @@ class ConfirmedFirmware: ), ] -# capability -> (module-name substring that provides it, project that ships it). -# A loaded-module set missing one of these fails the corresponding commands. -REQUIRED_MODULES = { - "gripper / plate handling": ("PARobot", "Tcp_cmd_server_pa"), -} - # Trailing build date (e.g. "10-25-2024" or "Apr 25 2025") and anything after it. _DATE = re.compile(r",?\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|[A-Za-z]{3,9}\.?\s+\d{1,2},?\s+\d{4}).*$") @@ -93,16 +86,6 @@ def is_confirmed(robot_type: int, gpl_version: str, tcs_version: str, modules: t ) -def missing_required_modules(modules: tuple) -> list: - """Required modules absent from the loaded set, as (capability, module, project).""" - loaded = " ".join(modules) - return [ - (capability, module, project) - for capability, (module, project) in REQUIRED_MODULES.items() - if module not in loaded - ] - - def suggest_entry(robot_type: int, gpl_version: str, tcs_version: str, modules: tuple) -> str: """Format a discovered stack as a ``ConfirmedFirmware(...)`` literal to paste into ``CONFIRMED_FIRMWARE_VERSIONS`` (build dates stripped, modules sorted).""" diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index 24ad9deca4c..eb03c581e9a 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -13,9 +13,9 @@ SUPPORTED_ROBOT_TYPES, is_confirmed, is_supported_model, - missing_required_modules, suggest_entry, ) +from pylabrobot.brooks.tcs_modules import missing_required_modules from pylabrobot.brooks import kinematics from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -138,7 +138,7 @@ def working_volume(self) -> WorkingVolume: shoulder = sh_lo + (sh_hi - sh_lo) * i / steps for j in range(steps + 1): elbow = el_lo + (el_hi - el_lo) * j / steps - joints = { + joints: JointPose = { Axis.BASE: 0.0, Axis.SHOULDER: shoulder, Axis.ELBOW: elbow, @@ -465,6 +465,7 @@ def __init__( is_dual_gripper: bool = False, has_rail: bool = False, read_kinematics_from_device: bool = True, + recover_out_of_range_at_setup: bool = True, ) -> None: """ Args: @@ -479,6 +480,11 @@ def __init__( the controller at setup and use them for kinematics; the constructor's ``gripper_length`` then acts only as a fallback. Set False to force the constructor values regardless of what the controller reports. + recover_out_of_range_at_setup: when True (the default), setup tries to drive a + small out-of-range excursion back inside the soft limits (slow single-axis move + toward in-range) via ``recover_axes_within_limits``. Set False to skip that. + Either way, setup raises if any axis is still out of range afterward, since the + controller would otherwise reject every commanded move (-1012). closed_gripper_position: firmware-unit value (passed to ``GripClosePos`` / ``GripOpenPos``) at which the jaws are at :attr:`min_gripper_width`. Depends on the mounted gripper. The conversion mm → firmware units is @@ -499,6 +505,7 @@ def __init__( gripper_length=gripper_length, gripper_z_offset=gripper_z_offset ) self._read_kinematics_from_device = read_kinematics_from_device + self._recover_out_of_range_at_setup = recover_out_of_range_at_setup # Device configuration, resolved once at setup; None until then. self._configuration: Optional[PreciseFlexConfiguration] = None if is_dual_gripper: @@ -509,28 +516,81 @@ def __init__( async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> None: await super()._on_setup(backend_params=backend_params) await self.stop_freedrive_mode() - # Resolve the device configuration once (identity, axes, limits, envelope) and - # cache it. The fixes below read from it instead of re-querying or hardcoding: - # the gripper width limits come from the gripper-axis soft limits, and the - # freedrive default axis set follows the installed axes (rail only when fitted). + # Resolve the device configuration once and adopt it as the source of truth; + # without it the class defaults stay in place. try: self._configuration = await self._request_configuration() - gmin, gmax = self._configuration.gripper_width_range - self._gripper_soft_min, self._gripper_soft_max = gmin, gmax - self.min_gripper_width, self.max_gripper_width = gmin, gmax - # Adopt the discovered geometry and topology as the source of truth: IK/FK - # use the device link lengths, and the rail / dual-gripper command paths - # follow what the controller actually reports. - self._kinematics_params = self._configuration.kinematics - self._has_rail = self._configuration.has_rail - self._is_dual_gripper = self._configuration.is_dual_gripper - self._assess_configuration(self._configuration) - except Exception as exc: # discovery is best-effort; fall back to the class defaults + except Exception as exc: # discovery is best-effort logger.warning( "[PreciseFlex %s] could not read configuration, using defaults: %s", self.driver.io._host, exc, ) + return + self._adopt_configuration(self._configuration) + self._log_configuration_summary(self._configuration) + self._assess_configuration(self._configuration) + await self._handle_out_of_range_axes() + + def _adopt_configuration(self, config: "PreciseFlexConfiguration") -> None: + """Adopt the discovered configuration as the source of truth for later commands. + + The gripper width limits come from the gripper-axis soft limits, IK/FK use the + device link lengths, and the rail / dual-gripper command paths follow the axes + the controller actually reports. + """ + gmin, gmax = config.gripper_width_range + self._gripper_soft_min, self._gripper_soft_max = gmin, gmax + self.min_gripper_width, self.max_gripper_width = gmin, gmax + self._kinematics_params = config.kinematics + self._has_rail = config.has_rail + self._is_dual_gripper = config.is_dual_gripper + + def _log_configuration_summary(self, config: "PreciseFlexConfiguration") -> None: + """Log a one-line summary of the discovered configuration.""" + logger.info( + "[PreciseFlex %s] robot_type %s, %s axes%s; %s; %s; modules: %s", + self.driver.io._host, + config.robot_type, + config.num_axes, + " + rail" if config.has_rail else "", + config.gpl_version, + config.tcs_version, + ", ".join(config.modules), + ) + + async def _handle_out_of_range_axes(self) -> None: + """Warn about every out-of-range axis, then correct what is recoverable, or raise. + + An axis parked outside its soft limit makes the arm unusable - the controller rejects + every commanded move with -1012. Setup logs the full set first (either way), then, with + ``recover_out_of_range_at_setup`` on (the default), drives each recoverable offender back + into range. If recovery is off or leaves any axis out, setup raises with explicit + recovery steps rather than leaving a dead arm. + """ + + def fmt(axes: Dict[Axis, tuple]) -> str: + return "; ".join( + f"{axis.name} at {value} (soft limit {limit})" for axis, (value, limit) in axes.items() + ) + + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if not outside: + return + logger.warning( + "[PreciseFlex %s] axes out of soft limit at setup: %s", self.driver.io._host, fmt(outside) + ) + if self._recover_out_of_range_at_setup: + await self.recover_axes_within_limits() + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if outside: + raise PreciseFlexError( + -1012, + f"axis outside its soft limit after setup: {fmt(outside)}. The controller rejects all " + f"commanded moves in this state. Recover with recover_axes_within_limits(), or freedrive " + f"the axis back into range manually (required for the wrist, or when an axis is far past " + f"its limit).", + ) def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: """Warn about an unsupported model, a missing TCS module, or an untested combo. @@ -548,13 +608,13 @@ def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: config.robot_type, ", ".join(SUPPORTED_ROBOT_TYPES.values()), ) - for capability, module, project in missing_required_modules(config.modules): + for module, provides, project in missing_required_modules(config.modules): logger.warning( - "[PreciseFlex %s] the '%s' module needed for %s is not loaded; install the " - "'%s' TCS project (obtain it from Brooks Automation) and restart it.", + "[PreciseFlex %s] the '%s' module (%s) is not loaded; install the '%s' TCS " + "project (obtain it from Brooks Automation) and restart it.", host, module, - capability, + provides, project, ) if not is_confirmed(config.robot_type, config.gpl_version, config.tcs_version, config.modules): @@ -645,8 +705,10 @@ async def move_gripper( force_sensing, ) units = self._mm_to_firmware_units(width) - if self._gripper_soft_min is not None and not ( - self._gripper_soft_min <= units <= self._gripper_soft_max + if ( + self._gripper_soft_min is not None + and self._gripper_soft_max is not None + and not (self._gripper_soft_min <= units <= self._gripper_soft_max) ): raise ValueError( f"gripper width {width} mm maps to firmware units {units:.1f}, outside the gripper " @@ -782,8 +844,47 @@ async def move_to_joint_position( await self._set_speed(backend_params.speed_pct) current = await self.request_joint_position() joint_coords = {**current, **position} + self._assert_within_soft_limits(current, joint_coords) await self._move_j(profile_index=self.profile_index, joint_coords=joint_coords) + def _axes_outside_soft_limits(self, joints: JointPose) -> Dict[Axis, tuple]: + """Axes whose value lies outside their soft limit, as ``axis -> (value, (lo, hi))``. + + Iterates the soft-limit set (keyed by :class:`Axis`) and looks each axis up in + ``joints`` so the comparison stays Axis-typed. Empty until the configuration has + been discovered. + """ + if self._configuration is None: + return {} + outside: Dict[Axis, tuple] = {} + for axis, (lo, hi) in self._configuration.soft_limits.items(): + value = joints.get(axis) + if value is not None and not (lo <= value <= hi): + outside[axis] = (value, (lo, hi)) + return outside + + def _assert_within_soft_limits(self, current: JointPose, target: JointPose) -> None: + """Turn the controller's cryptic ``-1012`` into a clear client-side error. + + Two cases block a commanded move: an axis already parked outside its soft limit + (the controller then rejects *every* commanded move until it is recovered), and + a target outside its soft limit (rejected outright). Freedrive can hand-move an + axis past a soft limit, so a taught pose can land outside the commandable + envelope. No-op until the configuration has been discovered. + """ + for axis, (value, limit) in self._axes_outside_soft_limits(current).items(): + raise ValueError( + f"{axis.name} is parked at {value}, outside its soft limit {limit}; the " + f"controller rejects commanded moves while an axis is out of range (-1012). " + f"Homing will not recover it (the rotary axes are absolute); call " + f"recover_axes_within_limits() to drive it back into range, then retry." + ) + for axis, (value, limit) in self._axes_outside_soft_limits(target).items(): + raise ValueError( + f"{axis.name} target {value} is outside its soft limit {limit}; the controller " + f"would reject the move (-1012). Re-teach this pose within the envelope." + ) + async def request_joint_position( self, backend_params: Optional[BackendParams] = None ) -> JointPose: @@ -1197,7 +1298,7 @@ async def request_tool_length(self) -> float: values = [float(v) for v in (await self.request_parameter(DataID.TOOL_OFFSET)).split(",")] return values[2] - async def request_kinematics(self) -> "kinematics.PF400Params": + async def request_kinematic_parameters(self) -> "kinematics.PF400Params": """Build PF400Params from the controller's stored geometry. Link lengths and tool length come from the device; gripper_z_offset is not on @@ -1296,9 +1397,10 @@ async def _request_configuration(self) -> "PreciseFlexConfiguration": # Kinematics: read the link/tool geometry from the controller by default, so # the driver is correct for whichever 400 variant is plugged in; fall back to # the constructor params if the read fails or the override is set. + kinematics_source: Literal["device", "provided", "default"] if self._read_kinematics_from_device: try: - kinematic_params = await self.request_kinematics() + kinematic_params = await self.request_kinematic_parameters() kinematics_source = "device" except Exception as exc: logger.warning( @@ -1312,7 +1414,9 @@ async def _request_configuration(self) -> "PreciseFlexConfiguration": kinematic_params = self._kinematics_params kinematics_source = "provided" # Classify by reach: the standard 400 has l1+l2 ~= 435 mm, the extended ~= 591. - reach_class = "extended" if (kinematic_params.l1 + kinematic_params.l2) >= 513 else "standard" + reach_class: Literal["standard", "extended"] = ( + "extended" if (kinematic_params.l1 + kinematic_params.l2) >= 513 else "standard" + ) return PreciseFlexConfiguration( manufacturer=await self.request_manufacturer(), @@ -1921,6 +2025,84 @@ async def _move_j(self, profile_index: int, joint_coords: JointPose) -> None: ) await self.driver.send_command(f"moveJ {profile_index} {angles_str}") + async def _move_one_axis(self, axis: Axis, position: float) -> None: + """Move a single axis to an absolute position (firmware ``MoveOneAxis``). + + Used for recovery: the controller blocks a normal move while an axis is out of + range, but allows a single-axis move heading back into range. Does not wait for + the motion to complete. + """ + await self.driver.send_command(f"MoveOneAxis {int(axis)} {position} {self.profile_index}") + + # Axes auto-recovered when parked out of range, in a deliberately safe order: the + # gripper jaw first (no arm motion), then the Z column (vertical clearance), then + # the rotary links shoulder -> elbow (smallest swept volume last to first). + # The wrist is intentionally absent: rotating it back to +/-180 can self-collide, so + # it needs the other links first driven to minimal clearance from the origin - a + # maneuver not implemented here. The rail (gross lateral travel) is likewise left out. + # TODO: clearance-aware wrist recovery (and rail). An out-of-range wrist or rail is + # left for the setup post-condition to raise on. + _RECOVERY_ORDER = (Axis.GRIPPER, Axis.BASE, Axis.SHOULDER, Axis.ELBOW) + + async def recover_axes_within_limits( + self, speed_pct: float = 20.0, max_distance: Optional[float] = 5.0 + ) -> Dict[Axis, float]: + """Bring out-of-range axes back inside their soft limits, one axis at a time. + + While an axis is outside its soft limit the controller rejects every commanded + coordinated move (-1012), and homing does not help on the absolute rotary axes. + A single-axis move is the documented exception: it may move an axis toward the + in-range region. Each recoverable offender is driven to just inside its nearest + soft limit, slowly, waiting for each to finish, in :attr:`_RECOVERY_ORDER`. + + Args: + speed_pct: Profile speed for the recovery moves (default 20%, deliberately slow). + max_distance: only move an axis that is out of range by at most this much (deg + for the rotary axes, mm for base/gripper). An axis further out is left in place: + a large unattended single-axis sweep risks a collision, so it is left for the + caller to recover manually (e.g. by freedriving). Pass None to move regardless. + + Returns: + The axes moved, as ``axis -> recovered target``. Empty when nothing recoverable + is out of range or the configuration was not discovered. The wrist and rail are + never auto-recovered (see :attr:`_RECOVERY_ORDER`). + """ + outside = self._axes_outside_soft_limits(await self.request_joint_position()) + if not outside: + return {} + prior_speed = await self._request_speed() + await self._set_speed(speed_pct) + recovered: Dict[Axis, float] = {} + try: + for axis in self._RECOVERY_ORDER: + if axis not in outside: + continue + value, (lo, hi) = outside[axis] + above = value > hi # which limit is violated; both moves below hinge on this + overshoot = (value - hi) if above else (lo - value) + if max_distance is not None and overshoot > max_distance: + continue # too far out to move unattended; left for the post-condition to raise + # Land just inside the violated limit, toward the in-range region. Clamp the + # 1-unit margin to half the range so the target stays within [lo, hi] even if + # the range is narrower than the margin (degenerate, but keeps direction sound). + margin = min(1.0, (hi - lo) / 2.0) + target = (hi - margin) if above else (lo + margin) + logger.warning( + "[PreciseFlex %s] recovering %s from %s into soft limit [%s, %s] -> %s", + self.driver.io._host, + axis.name, + value, + lo, + hi, + target, + ) + await self._move_one_axis(axis, target) + await self.driver._wait_for_eom() + recovered[axis] = target + finally: + await self._set_speed(prior_speed) # don't leave the profile at the slow recovery speed + return recovered + async def release_brake(self, axis: int) -> None: """Release the axis brake. @@ -2138,6 +2320,7 @@ def __init__( timeout: int = 20, gripper_length: float = 162.0, gripper_z_offset: float = 0.0, + recover_out_of_range_at_setup: bool = True, ) -> None: """ Args: @@ -2148,6 +2331,9 @@ def __init__( matches the stock single gripper on the PF400. gripper_z_offset: vertical offset in mm from the wrist plate to the tool tip. Defaults to 0 mm. + recover_out_of_range_at_setup: when True (the default), setup tries to drive a + small out-of-range excursion back inside the soft limits; set False to skip + that. Either way setup raises if an axis is still out of range afterward. """ driver = PreciseFlexDriver(host=host, port=port, timeout=timeout) super().__init__(driver=driver) @@ -2158,6 +2344,7 @@ def __init__( gripper_length=gripper_length, gripper_z_offset=gripper_z_offset, closed_gripper_position=closed_gripper_position, + recover_out_of_range_at_setup=recover_out_of_range_at_setup, ) self.reference = Resource(name="PreciseFlex400", size_x=200, size_y=200, size_z=200) self.arm = OrientableGripperArm(backend=backend, reference_resource=self.reference) diff --git a/pylabrobot/brooks/precise_flex_tests.py b/pylabrobot/brooks/precise_flex_tests.py index 73fa9496494..98580ac5478 100644 --- a/pylabrobot/brooks/precise_flex_tests.py +++ b/pylabrobot/brooks/precise_flex_tests.py @@ -2,7 +2,7 @@ from typing import Tuple from unittest.mock import AsyncMock, MagicMock -from pylabrobot.brooks.precise_flex import PreciseFlex400Backend +from pylabrobot.brooks.precise_flex import Axis, PreciseFlex400Backend def _make_backend( @@ -73,3 +73,47 @@ def test_mm_to_firmware_units_helper(self): self.assertEqual(self.backend._mm_to_firmware_units(60.0), 500.0) self.assertEqual(self.backend._mm_to_firmware_units(145.0), 585.0) self.assertEqual(self.backend._mm_to_firmware_units(100.0), 540.0) + + +class TestPreciseFlex400OutOfRangeRecovery(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.backend, self.driver = _make_backend() + self.driver._wait_for_eom = AsyncMock() + self.backend._request_speed = AsyncMock(return_value=50.0) + # Minimal stub configuration: only the soft limits the recovery logic reads. + self.backend._configuration = MagicMock( + soft_limits={ + Axis.SHOULDER: (-93.0, 93.0), + Axis.ELBOW: (12.0, 348.0), + Axis.WRIST: (-960.0, 960.0), + } + ) + + def _move_one_axis_cmds(self) -> list[str]: + return [ + c.args[0] + for c in self.driver.send_command.call_args_list + if c.args[0].startswith("MoveOneAxis") + ] + + async def test_recover_moves_offenders_toward_limit_in_order_and_skips_wrist(self): + """Each recoverable offender is driven 1 unit *inside* the violated limit (above-max down, + below-min up), shoulder before elbow per _RECOVERY_ORDER; the wrist is never auto-moved.""" + self.backend.request_joint_position = AsyncMock( + return_value={Axis.SHOULDER: 93.5, Axis.ELBOW: 9.0, Axis.WRIST: 962.0} + ) + recovered = await self.backend.recover_axes_within_limits() + self.assertEqual(recovered, {Axis.SHOULDER: 92.0, Axis.ELBOW: 13.0}) # wrist excluded + cmds = self._move_one_axis_cmds() + self.assertEqual( + cmds, ["MoveOneAxis 2 92.0 1", "MoveOneAxis 3 13.0 1"] + ) # shoulder (2) before elbow (3) + + async def test_recover_skips_axis_too_far_out_of_range(self): + """An axis past its limit by more than max_distance is left in place (no unattended big sweep).""" + self.backend.request_joint_position = AsyncMock( + return_value={Axis.SHOULDER: 120.0} # 27 deg past the 93 limit, beyond the 5 cap + ) + recovered = await self.backend.recover_axes_within_limits() + self.assertEqual(recovered, {}) + self.assertEqual(self._move_one_axis_cmds(), []) diff --git a/pylabrobot/brooks/tcs_modules.py b/pylabrobot/brooks/tcs_modules.py new file mode 100644 index 00000000000..6d064c4ca8e --- /dev/null +++ b/pylabrobot/brooks/tcs_modules.py @@ -0,0 +1,31 @@ +"""TCS modules the PreciseFlex driver requires. + +Derived from the TCS source: every command this driver issues is defined in the base +``Cmd.gpl`` or in ``PARobot.gpl`` - none in ``Load_save.gpl``, ``StereoVision.gpl`` +(IntelliGuide), or the Auto-Center plug-in, so those are not required by this driver. +The base command server is implicit (you cannot connect or run ``version`` without it). + +``PARobot.gpl`` carries two modules and the driver uses commands from both: ``PARobot`` +(gripper open/close, plate handling, freedrive, park, rail, config) and ``SSGrip`` +(``IsFullyClosed``). Both ship in that one file, so installing ``Tcp_cmd_server_pa`` +provides both - but the driver depends on each, so both are checked. + +A missing required module is the usual cause of ``-2805 *Unknown command*``; checking +presence at setup turns that into a clear "install it from Brooks" message up front. +""" + +# required module (name substring) -> (what it provides, project that ships it) +REQUIRED_MODULES = { + "PARobot": ("gripper open/close, plate, freedrive, park, rail", "Tcp_cmd_server_pa"), + "SSGrip": ("gripper closed-state sensor (IsFullyClosed)", "Tcp_cmd_server_pa"), +} + + +def missing_required_modules(modules: tuple) -> list: + """Required modules absent from the loaded set, as (module, provides, project).""" + loaded = " ".join(modules) + return [ + (module, provides, project) + for module, (provides, project) in REQUIRED_MODULES.items() + if module not in loaded + ] From d196f6479c2e04a4ff8cf9b8f569ab203a51d152 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 5 Jun 2026 23:56:26 +0100 Subject: [PATCH 5/5] v1: Brooks PF400: skip the out-of-range check until the robot is homed - Add ROBOT_HOMED (DataID 2800) and _is_robot_homed(). - _handle_out_of_range_axes now returns early (with a warning) when the robot is not homed: an unhomed incremental axis reads a meaningless ~0 (a false-positive out-of-range), and the controller blocks the single-axis recovery move with -1021 anyway, so the check waits for homing instead of raising or stalling at setup. Co-Authored-By: Claude Opus 4.8 (1M context) --- pylabrobot/brooks/data_ids.py | 1 + pylabrobot/brooks/precise_flex.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/pylabrobot/brooks/data_ids.py b/pylabrobot/brooks/data_ids.py index 90b57f9f975..0322b4ec21a 100644 --- a/pylabrobot/brooks/data_ids.py +++ b/pylabrobot/brooks/data_ids.py @@ -24,6 +24,7 @@ class DataID(IntEnum): CONTROLLER_SERIAL = 110 ROBOT_TYPE = 116 POWER_STATE = 234 + ROBOT_HOMED = 2800 # 1 = all axes homed; 0 = not homed (commanded motion blocked) NUM_AXES = 2000 ROBOT_NAME = 2002 AXIS_MASK = 2003 diff --git a/pylabrobot/brooks/precise_flex.py b/pylabrobot/brooks/precise_flex.py index eb03c581e9a..63b75ffdb25 100644 --- a/pylabrobot/brooks/precise_flex.py +++ b/pylabrobot/brooks/precise_flex.py @@ -559,6 +559,14 @@ def _log_configuration_summary(self, config: "PreciseFlexConfiguration") -> None ", ".join(config.modules), ) + async def _is_robot_homed(self) -> bool: + """Whether all axes are homed (DataID 2800). + + Homing is lost on every power cycle (incremental encoders), and until it is redone + the controller blocks commanded motion (-1021) and reports unreliable positions. + """ + return _parse_scalar(await self.request_parameter(DataID.ROBOT_HOMED)) == 1.0 + async def _handle_out_of_range_axes(self) -> None: """Warn about every out-of-range axis, then correct what is recoverable, or raise. @@ -567,7 +575,18 @@ async def _handle_out_of_range_axes(self) -> None: ``recover_out_of_range_at_setup`` on (the default), drives each recoverable offender back into range. If recovery is off or leaves any axis out, setup raises with explicit recovery steps rather than leaving a dead arm. + + No-op until the robot is homed: an unhomed incremental axis reads a meaningless ~0 + (so the check would false-positive), and the controller blocks the recovery move with + -1021 anyway. Homing is the prerequisite, so the check waits for it. """ + if not await self._is_robot_homed(): + logger.warning( + "[PreciseFlex %s] robot not homed; skipping the out-of-range check until it is " + "(home() first - unhomed positions are unreliable and commanded moves are blocked).", + self.driver.io._host, + ) + return def fmt(axes: Dict[Axis, tuple]) -> str: return "; ".join(