Improvements to bus servo library management#161
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the bus_servo module to support pluggable backend implementations (hardware SDKs vs simulation), migrates bus-servo configuration to degrees, and updates tests/docs/environment configs to match the new architecture.
Changes:
- Introduce a backend abstraction (
BusServoBase) plus a factory and new backends (Waveshare / Rustypot / Simulation). - Update
bus_servoruntime code and environment YAMLs to operate in degrees rather than raw units. - Expand/adjust unit tests and documentation to align with the new backend architecture and config format.
Reviewed changes
Copilot reviewed 23 out of 61 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
test.sh |
Runs unittest discovery with additional filename patterns and buffered output. |
src/tests/test_module_loader.py |
Adds module-level dependency mocking for tests. |
src/tests/test_base_module.py |
Adds module-level dependency mocking for tests. |
src/modules/personality/tests/test_personality.py |
Adds module-level dependency mocking for tests. |
src/modules/personality/personality.py |
Switches personality servo commands to move_relative() in multiple places. |
src/modules/network/rtlsdr/tests/test_rtlsdr.py |
Adds module-level dependency mocking for tests. |
src/modules/actuators/tests/servo_test.py |
Refactors legacy servo tests to unittest and mocks hardware dependencies. |
src/modules/actuators/tests/bus_servo_test.py |
Updates bus-servo queue tests to mock the new backend factory + backend servo. |
src/modules/actuators/servo/tests/test_servo.py |
Adds dependency mocking; minor test structure tweaks. |
src/modules/actuators/piservo/tests/test_piservo.py |
Adds dependency mocking; minor test structure tweaks. |
src/modules/actuators/bus_servo/tests/test_bus_servo.py |
Adds/expands backend-level unit tests (Waveshare/Rustypot/Simulation + factory). |
src/modules/actuators/bus_servo/libraries/bus_servo_base.py |
Defines the abstract backend interface for bus servos. |
src/modules/actuators/bus_servo/libraries/factory.py |
Adds BusServoFactory for selecting/instantiating backends. |
src/modules/actuators/bus_servo/libraries/utils.py |
Adds unit conversion helpers (degrees/radians/raw linear mappings). |
src/modules/actuators/bus_servo/libraries/simulation_backend.py |
Implements a simulation backend for development/testing without hardware. |
src/modules/actuators/bus_servo/libraries/rustypot_backend.py |
Implements a Rustypot-based backend for supported servos. |
src/modules/actuators/bus_servo/libraries/waveshare_backend.py |
Adds a Waveshare backend wrapper around SDK functionality (currently incomplete). |
src/modules/actuators/bus_servo/libraries/waveshare/__init__.py |
Initializes the vendored Waveshare library package. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/__init__.py |
Vendored Waveshare ST SDK package init. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/stservo_def.py |
Vendored Waveshare ST SDK definitions. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/protocol_packet_handler.py |
Vendored Waveshare ST SDK implementation. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/port_handler.py |
Vendored Waveshare ST SDK serial port handler. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_write.py |
Vendored Waveshare ST SDK sync write support. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_read.py |
Vendored Waveshare ST SDK sync read support. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/sts.py |
Vendored Waveshare ST SDK STS protocol implementation. |
src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/scscl.py |
Vendored Waveshare ST SDK SCSCL protocol implementation. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/wheel.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/reg_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_all.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/ping.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/change_id.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/__init__.py |
Vendored Waveshare SC SDK package init. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/scservo_def.py |
Vendored Waveshare SC SDK definitions. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/protocol_packet_handler.py |
Vendored Waveshare SC SDK implementation. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/port_handler.py |
Vendored Waveshare SC SDK serial port handler. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/packet_handler.py |
Vendored Waveshare SC SDK packet handler shim. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_write.py |
Vendored Waveshare SC SDK sync write support. |
src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_read.py |
Vendored Waveshare SC SDK sync read support. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_read_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/read_write.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/ping.py |
Vendored Waveshare example script. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/__init__.py |
Vendored Waveshare example-support SDK init. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/scservo_def.py |
Vendored Waveshare example-support SDK definitions. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/protocol_packet_handler.py |
Vendored Waveshare example-support SDK implementation. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/port_handler.py |
Vendored Waveshare example-support SDK serial port handler. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/packet_handler.py |
Vendored Waveshare example-support SDK packet handler shim. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_write.py |
Vendored Waveshare example-support SDK sync write support. |
src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_read.py |
Vendored Waveshare example-support SDK sync read support. |
src/modules/actuators/bus_servo/config.yml |
Adds declared dependencies for bus_servo module. |
src/modules/actuators/bus_servo/bus_servo.py |
Refactors Servo module to delegate hardware operations to selected backend. |
src/modules/actuators/bus_servo/README.md |
Updates docs for backend architecture and degrees-based configuration. |
environments/laptop.yml |
Adds bus_servo config using simulation backend and degrees-based ranges/poses. |
environments/cody.yml |
Migrates bus_servo poses/ranges to degrees-based values. |
README.md |
Updates module list/docs and adds a testing section. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def handle_errors(self, comm_result, error): | ||
| """ | ||
| Handle communication errors. | ||
| :param comm_result: Communication result | ||
| :param error: Error code | ||
| """ | ||
| if comm_result != COMM_SUCCESS: | ||
| self.log("%s" % self.packetHandler.getTxRxResult(comm_result), level='error') | ||
| # log stack trace for debugging |
There was a problem hiding this comment.
handle_errors() references COMM_SUCCESS, but this module never defines or imports it, so this will raise NameError. Import COMM_SUCCESS from the appropriate SDK module (ST vs SC) or define a backend-level constant, and ensure it matches the SDK's semantics.
| self.servos['leg_l_tilt'].move_relative(self.servos['leg_l_tilt'].start) | ||
| self.servos['leg_r_tilt'].move_relative(self.servos['leg_r_tilt'].start) |
There was a problem hiding this comment.
move_relative() expects a delta, but self.servos['leg_*_tilt'].start is an absolute position. Calling move_relative(start) will overshoot by start degrees rather than returning to the start/center position. Use an absolute move (move(start)) here, or compute the delta as (start - current_position) before calling move_relative().
| self.servos['leg_l_tilt'].move_relative(self.servos['leg_l_tilt'].start) | ||
| # self.servos['leg_r_knee'].move_relative(-90) |
There was a problem hiding this comment.
move_relative(self.servos['leg_l_tilt'].start) again treats an absolute start position as a relative delta. This will drive the servo far past the intended target. Prefer move(self.servos['leg_l_tilt'].start) or compute the needed delta first.
| import sys | ||
| from unittest.mock import MagicMock | ||
| sys.modules['yaml'] = MagicMock() | ||
| import sys | ||
| import unittest | ||
| from unittest.mock import MagicMock, patch |
There was a problem hiding this comment.
Duplicate import sys (lines 1 and 4). Please remove the redundant import; it also makes the pre-import sys.modules[...] mocking harder to follow.
| # self.log(f"(MOVE) Moving servo {self.identifier} from {self.pos} to position {position} for range {self.range}") | ||
| if position < self.range[0] or position > self.range[1]: | ||
| self.log(f"Position {position} out of range ({self.range[0]}-{self.range[1]})", level='error') | ||
| return |
There was a problem hiding this comment.
_do_move() receives speed and acceleration (and the queue stores per-move overrides), but the new backend call ignores both. This is a behavior regression: callers can request a speed/acceleration override but it has no effect. Consider extending the backend interface to accept these values (or call backend_servo.set_speed(...) / set acceleration) or remove these parameters from move()/the queue to avoid a misleading API.
| return | |
| return | |
| # Apply per-move speed/acceleration overrides if supported by backend | |
| effective_speed = speed if speed is not None else getattr(self, "speed", None) | |
| if effective_speed is not None and hasattr(self.backend_servo, "set_speed"): | |
| try: | |
| self.backend_servo.set_speed(effective_speed) | |
| except TypeError: | |
| # Backend method signature may differ; ignore override in that case | |
| pass | |
| effective_accel = acceleration if acceleration is not None else getattr(self, "acceleration", None) | |
| if effective_accel is not None and hasattr(self.backend_servo, "set_acceleration"): | |
| try: | |
| self.backend_servo.set_acceleration(effective_accel) | |
| except TypeError: | |
| # Backend method signature may differ; ignore override in that case | |
| pass |
| def set_speed(self, value, unit='degrees'): | ||
| # Implement as needed, similar to move_to | ||
| pass | ||
|
|
||
| def detach(self): | ||
| # Disable torque | ||
| pass | ||
|
|
||
| def attach(self): | ||
| # Enable torque | ||
| pass | ||
|
|
||
| def exit(self): | ||
| # Cleanup | ||
| pass |
There was a problem hiding this comment.
set_speed(), detach(), attach(), and exit() are currently unimplemented (pass). These are called by bus_servo.Servo (e.g., detach() during calibration and exit() on shutdown), so leaving them empty will silently break expected behavior. Implement them via the Waveshare SDK (torque enable/disable, port close, etc.) or raise NotImplementedError to fail fast.
|
|
||
|
|
||
|
|
There was a problem hiding this comment.
calibrate_to_center() was removed from this class, but setup_messaging() still calls it when center_on_boot is enabled. This will cause an AttributeError at runtime. Please either re-add calibrate_to_center() (likely delegating to self.backend_servo.calibrate_to_center()) or remove/replace the call site accordingly.
| def calibrate_to_center(self): | |
| """ | |
| Calibrate this servo to its center position using the backend implementation. | |
| This method is called, for example, when `center_on_boot` is enabled in setup_messaging(). | |
| """ | |
| backend = getattr(self, "backend_servo", None) | |
| if backend is None: | |
| # Backend not initialized; cannot perform calibration. | |
| self.log( | |
| f"backend_servo is not configured for {self.identifier}; cannot calibrate to center.", | |
| level='error', | |
| ) | |
| return | |
| calibrate = getattr(backend, "calibrate_to_center", None) | |
| if callable(calibrate): | |
| calibrate() | |
| else: | |
| # Backend does not support center calibration; avoid raising AttributeError. | |
| self.log( | |
| f"Backend for {self.identifier} does not implement calibrate_to_center(); skipping center calibration.", | |
| level='warning', | |
| ) |
There was a problem hiding this comment.
Call the method in backend implementation instead.
There was a problem hiding this comment.
Done in commit 6e97bc4. Added calibrate_to_center() to the Servo class in bus_servo.py that delegates directly to self.backend_servo.calibrate_to_center(). The waveshare backend implementation was also fixed to use self.servo_id (not self.index) and properly converts the center degrees to raw units via move_to_raw().
| def move_to_raw(self, raw_value): | ||
| # Send move command using SDK | ||
| pass | ||
|
|
||
| def get_position_raw(self): | ||
| # Read position using SDK | ||
| pass |
There was a problem hiding this comment.
move_to_raw() and get_position_raw() are currently pass, but move_to() and get_position() depend on them. With the default backend set to waveshare, this means bus servos will not move and position reads will return None/fail. Implement these methods using the underlying Waveshare SDK (and ensure port open/baudrate setup happens in __init__).
| def get_speed(self): | ||
| """ | ||
| Get the current speed of the servo. | ||
| """ | ||
| if self.model.startswith('ST'): | ||
| # Read STServo present position | ||
| sts_present_position, sts_present_speed, sts_comm_result, sts_error = self.packetHandler.ReadPosSpeed(self.index) | ||
| if not self.handle_errors(sts_comm_result, sts_error): | ||
| return sts_present_speed |
There was a problem hiding this comment.
get_speed() uses self.index, but this class (and BusServoBase) stores the id as self.servo_id; self.index is never set here, so this will raise at runtime. Also BusServoBase.get_speed is defined as get_speed(self, unit='degrees'), but this override doesn't accept unit, which can lead to TypeError if callers pass it. Align this implementation with the base interface and use self.servo_id consistently.
|
@copilot apply changes based on the comments in this thread except those I have specified not to fix. |
…ty, and test files Agent-Logs-Url: https://github.com/makerforgetech/modular-biped/sessions/224876d0-40b3-4e56-9ccc-bbbfd5ed44a1 Co-authored-by: danic85 <6583012+danic85@users.noreply.github.com>
Applied all changes from the review thread in commit
The dependency config change was skipped per your instruction. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 23 out of 61 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): | ||
| super().__init__(servo_id, model, port, baudrate, range, **kwargs) | ||
| self.speed = kwargs.get('speed', 300) | ||
| self.acceleration = kwargs.get('acceleration', 50) |
There was a problem hiding this comment.
__init__() accepts range_degrees, and BusServoFactory.create() passes it through, but the super().__init__(..., range, **kwargs) call drops range_degrees (base will always see range_degrees=None). If range_degrees is still part of the intended API, pass it through to BusServoBase (or remove it from the signature/factory to avoid a misleading parameter).
| def move_to(self, value, unit='degrees'): | ||
| self.log(f"[SIM] Moving servo {self.servo_id} to {value} {unit}") | ||
| self.position = value | ||
| self.is_moving = True | ||
| time.sleep(0.01) | ||
| self.is_moving = False | ||
|
|
||
| def get_position(self, unit='degrees'): | ||
| self.log(f"[SIM] Getting position of servo {self.servo_id} in {unit}") | ||
| return self.position |
There was a problem hiding this comment.
SimulationBusServo.move_to()/get_position() ignore the unit parameter and store/return the raw value unchanged. This makes simulation behave inconsistently with other backends (e.g., calling move_to(..., unit='radians') will store radians but later get_position(unit='degrees') returns radians). Consider normalizing internal state to a single unit (e.g., degrees) and converting on input/output to match the backend contract.
| # Implement as needed, similar to move_to | ||
| pass |
There was a problem hiding this comment.
set_speed() is currently a no-op (pass). Since Servo._do_move() now calls backend_servo.set_speed(...) for each move, this will silently ignore speed settings on the rustypot backend. Either implement the mapping to the rustypot controller API, or explicitly raise NotImplementedError/log a warning so callers aren’t misled into thinking speed overrides are applied.
| # Implement as needed, similar to move_to | |
| pass | |
| # Speed override is not currently supported for rustypot backend. | |
| # Raising NotImplementedError avoids silently ignoring requested speeds. | |
| raise NotImplementedError("set_speed is not supported in the rustypot backend; speed overrides are not applied.") |
| import sys | ||
| from unittest.mock import MagicMock | ||
| sys.modules['yaml'] = MagicMock() | ||
| sys.modules['requests'] = MagicMock() |
There was a problem hiding this comment.
Same as in test_module_loader.py: mutating sys.modules at import time affects the entire test run and can leak across modules. Consider using a scoped patch.dict(sys.modules, ...) in setUp with cleanup to keep dependency-mocking isolated per test.
| """ | ||
| if not self.poses: | ||
| return None | ||
| for pose in self.poses: | ||
| if pose_name in pose: | ||
| return pose[pose_name] | ||
| return None # or raise an exception if preferred | ||
| return self.poses.get(pose_name) |
There was a problem hiding this comment.
get_pose_value() now returns self.poses.get(pose_name), but self.poses appears to be a mapping of pose name → dict of {servo_name: value}. Returning the whole pose dict doesn’t match the method’s docstring (“position value”) and makes it easy to accidentally treat a dict as a numeric position. Consider returning the value for this servo (e.g., lookup self.identifier inside the pose dict) or rename/update the docstring to reflect the actual return type.
| if model.startswith('ST'): | ||
| from .waveshare.STservo_sdk import PortHandler, sts | ||
| self.portHandler = PortHandler(port) | ||
| self.packetHandler = sts(self.portHandler) | ||
| self.max_raw = 4095 | ||
| self.min_raw = 0 | ||
| self.max_deg = 360 | ||
| self.min_deg = 0 | ||
| elif model.startswith('SC'): | ||
| from .waveshare.SCservo_sdk import PortHandler, PacketHandler | ||
| self.portHandler = PortHandler(port) | ||
| self.packetHandler = PacketHandler(1) | ||
| self.max_raw = 1023 | ||
| self.min_raw = 0 | ||
| self.max_deg = 300 | ||
| self.min_deg = 0 | ||
| else: | ||
| raise ValueError(f"Unknown model: {model}") | ||
| # Open port and set baudrate | ||
| if not self.portHandler.openPort(): | ||
| raise RuntimeError(f"Failed to open port {port} for servo {servo_id}") | ||
| if not self.portHandler.setBaudRate(baudrate): | ||
| raise RuntimeError(f"Failed to set baudrate {baudrate} for servo {servo_id}") |
There was a problem hiding this comment.
Each WaveshareBusServo instance opens its own PortHandler on the same port in __init__(). In this codebase, ModuleLoader creates one Servo instance per configured servo ID, so multiple servos will likely share the same serial port path. Opening the same serial port multiple times (and having independent is_using locks) can cause interleaved writes, corrupted packets, or port-open failures. Consider introducing a shared connection manager (singleton keyed by (port, baudrate, model/protocol)) so all servos on the same bus reuse one PortHandler/packet handler and serialize access centrally, with reference counting for closePort().
There was a problem hiding this comment.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Done in commit e1b3925. Added _WaveshareConnectionManager to waveshare_backend.py — a thread-safe class-level singleton keyed by (port, baudrate, protocol). WaveshareBusServo.__init__ calls acquire() which creates and opens the port only on the first instance for that bus, then increments a ref-count for each subsequent instance. exit() calls release() which decrements the ref-count and only calls closePort() when it reaches zero. A public reset() classmethod is provided for test isolation. Three new tests verify the shared-handler and ref-counting behaviour.
| class RustypotBusServo(BusServoBase): | ||
|
|
||
| def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): | ||
| super().__init__(servo_id, model, port, baudrate, range, **kwargs) |
There was a problem hiding this comment.
Like WaveshareBusServo, this __init__() accepts range_degrees and the factory passes it, but the super().__init__(..., range, **kwargs) call drops range_degrees, so BusServoBase.range_degrees will always be None. Either forward it to the base class or remove the parameter to avoid a misleading API.
| super().__init__(servo_id, model, port, baudrate, range, **kwargs) | |
| super().__init__(servo_id, model, port, baudrate, range, range_degrees=range_degrees, **kwargs) |
| import sys | ||
| from unittest.mock import MagicMock | ||
| sys.modules['yaml'] = MagicMock() | ||
| sys.modules['requests'] = MagicMock() |
There was a problem hiding this comment.
These top-level sys.modules[...] = MagicMock() assignments are global for the entire test process and can leak into other tests (changing import behavior in hard-to-debug ways). Prefer scoping this to the specific tests (e.g., patch.dict(sys.modules, {...}) in setUp/addCleanup) so the mocks are reliably restored after each test module/case.
…g for same-bus servos Agent-Logs-Url: https://github.com/makerforgetech/modular-biped/sessions/ae8b758b-e5cb-4cc1-aabf-2c2c41aa5074 Co-authored-by: danic85 <6583012+danic85@users.noreply.github.com>
What type of change is this?
📝 What does this change do?
Refactors the
bus_servomodule to support pluggable backend implementations (hardware SDKs vs simulation), migrates bus-servo configuration to degrees, and fixes several bugs identified during code review.Key improvements include:
BusServoBase) with a factory and concrete backends: Waveshare, Rustypot, and Simulationmove_to_raw(),get_position_raw(),set_speed(),set_acceleration(),detach(),attach(),exit(), andcalibrate_to_center()using the Waveshare SDK_WaveshareConnectionManager: a thread-safe shared connection pool (keyed by port, baudrate, and protocol) so allWaveshareBusServoinstances on the same bus reuse onePortHandler/packet handler, with reference counting ensuringclosePort()is only called when the last servo exitsCOMM_SUCCESSmissing constant in the Waveshare backendself.index/self.name→self.servo_idin the Waveshare backend to match the base classPortHandlerand open the port/set baudrate on startuplogcallable toBusServoBaseso backends can log errors without a caller-provided functioncalibrate_to_center()on theServoclass, delegating to the backend implementationspeedandaccelerationoverrides in_do_move()viaset_speed()/set_acceleration()if supported by the backendget_pose_value()to use proper dict lookup (self.poses.get(pose_name))move_relative(start)calls inpersonality.pytomove(start), sincestartis an absolute position, not a relative deltabus_servoruntime code and environment YAMLs to operate in degrees rather than raw units❓ Why is this change needed?
The previous implementation had stub methods (
pass) in the Waveshare backend that silently broke servo movement, position reading, torque control, and shutdown. Several runtime errors would occur on hardware due to undefined constants (COMM_SUCCESS), wrong attribute names (self.indexinstead ofself.servo_id), and a missingcalibrate_to_center()method on theServoclass. EachWaveshareBusServoinstance also opened its own serial port connection independently, which could cause interleaved writes, corrupted packets, or port-open failures when multiple servos shared the same bus. Thepersonality.pybalance logic would also overshoot servo positions by treating absolute start positions as relative deltas.🛠️ How was this implemented?
waveshare_backend.py: Added_WaveshareConnectionManagerclass withacquire(),release(), andreset()classmethods;acquire()creates the port connection only once per(port, baudrate, protocol)key and tracks a reference count,release()decrements the count and closes the port only when it reaches zero;WaveshareBusServo.__init__now callsacquire()instead of opening its own port, andexit()callsrelease(); addedCOMM_SUCCESS = 0module-level constant; implemented all stub methods using the Waveshare ST/SC SDK calls; fixedget_speed()signature to match base class (unitparameter); fixed SC__init__to import and createPortHandler;calibrate_to_center()now converts degrees to raw viadegrees_to_raw()and delegates tomove_to_raw()bus_servo_base.py: Addedself.logdefault (lambda msg, **kw: print(msg)) solevel='error'keyword calls work without a caller-provided log functionbus_servo.py: Addedcalibrate_to_center()delegating toself.backend_servo.calibrate_to_center();_do_move()now applies speed/acceleration overrides viaset_speed()/set_acceleration()withhasattrguards;get_pose_value()usesself.poses.get(pose_name)for correct dict accesspersonality.py: Changed threemove_relative(start)calls tomove(start)for correct absolute positioningimport sysfromtest_piservo.pyandtest_servo.py; updatedtest_bus_servo.pysetUp with proper mock return values for SDK 2-tuple methods; addedtearDowncleanup via_WaveshareConnectionManager.reset(); added tests verifying shared-connection reuse and ref-count port-close behaviour🧪 How was this tested?
TestWaveshareBusServo,TestRustypotBusServo,TestBusServo,TestPiServo,TestServo)portHandler/packetHandlerinstance and thatclosePort()is only called after the last servo callsexit()💥 Breaking changes
🗂 Related issues
✅ PR Checklist
🚀 Thank you for your contribution to the project!
💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.