diff --git a/README.md b/README.md index 0a75f062..26e07763 100644 --- a/README.md +++ b/README.md @@ -7,24 +7,39 @@ The **Modular Robot** project aims to educate and inspire individuals interested - **Control Systems**: Utilizes Arduino and Raspberry Pi, managed through custom PCBs. - **Modular Body**: Configurable body components allow for easy customization and adaptability. - **Software Modules**: - - Animation: Handles the animation of the robot, including walking, turning, and other movements. - - Braillespeak: Converts text to Braille and speaks it using a proprietary audio output using the onboard buzzer. - - Buzzer: Controls the buzzer for audio output. Includes the ability to play tones and melodies. - - ChatGPT: Uses the OpenAI GPT models to process text based on user input. - - Logging: Logs data to a file for debugging and analysis. - - Motion Detection: Handles motion detection using an onboard microwave motion sensor. - - Neopixel: Controls the onboard Neopixel LEDs for visual feedback. - - PiServo: Controls the servos connected to the Raspberry Pi. - - PiTemperature: Reads the temperature from the integrated temperature sensor on the Raspberry Pi. - - RTLSDR: Uses an RTL-SDR dongle to receive and process radio signals. - - Serial Connection: Handles serial communication between the Raspberry Pi and Arduino. - - Servos: Controls the servos connected to the Arduino via the Raspberry Pi and the serial connection. - - Tracking: Uses computer vision to track objects and faces using the onboard camera. - - Translator: Translates text between languages using the Google Translate API. - - TTS: Converts text to speech using the onboard speaker. - - Viam: Uses the VIAM API to integrate Viam modules for additional functionality. - - Vision: Handles image processing and computer vision tasks using the onboard IMX500 Raspberry Pi AI camera. - - Voice Recognition: Uses the Google Speech API to convert speech to text. + - Animation: Handles the animation of the robot, including walking, turning, and other movements. [README](src/modules/animate/README.md) + - ArduinoSerial: Handles serial communication between the Raspberry Pi and Arduino. [README](src/modules/network/arduinoserial/README.md) + - BNO055: Interfaces with the 9-DOF BNO055 IMU sensor over I2C for orientation and motion data. [README](src/modules/sensor/imu/bno055/README.md) + - Braillespeak: Converts text to Braille and speaks it using a proprietary audio output using the onboard buzzer. [README](src/modules/audio/braillespeak/README.md) + - BusServo: Controls serial bus servos (ST/SC series) with support for multiple backends (Waveshare, Rustypot, Simulation). [README](src/modules/actuators/bus_servo/README.md) + - Buzzer: Controls the buzzer for audio output. Includes the ability to play tones and melodies. [README](src/modules/audio/buzzer/README.md) + - ChatGPT: Uses the OpenAI GPT models to process text based on user input. [README](src/modules/chatgpt/README.md) + - ControllerHandler: Maps gamepad inputs to robot actions using configurable YAML mappings, with modifier button support. [README](src/modules/controller_handler/README.md) + - DiscordBot: Hosts a Discord bot that answers questions using ChatGPT, with configurable knowledge sources. [README](src/modules/network/discordbot/README.md) + - Display (TFT): Controls a TFT display over SPI. [README](src/modules/display/tft_display_eye/README.md) + - Display (OLED): Controls a Waveshare OLED display. [README](src/modules/display/waveshare_oled/README.md) + - EmotionAnalysis: Maps text sentiment to NeoPixel LED colours. [README](src/modules/neopixel/emotion_analysis/README.md) + - GPIO Laser: Controls a GPIO-connected laser. [README](src/modules/gpio/laser/README.md) + - GPIO Read: General purpose GPIO input reader. [README](src/modules/gpio/read/README.md) + - I2CServo: Controls servos connected via the I2C protocol. [README](src/modules/i2c_servo/README.md) + - InputRecorder: Records and replays controller input events for creating animations. [README](src/modules/input_recorder/README.md) + - Logging: Logs data to a file for debugging and analysis. [README](src/modules/logwrapper/README.md) + - Motion Detection: Handles motion detection using an onboard microwave motion sensor. [README](src/modules/gpio/motion/README.md) + - MPU6050: Interfaces with the MPU6050 accelerometer/gyroscope sensor over I2C. [README](src/modules/sensor/imu/mpu6050/README.md) + - Neopixel: Controls the onboard Neopixel LEDs for visual feedback, supporting GPIO, I2C, and SPI protocols. [README](src/modules/neopixel/neopx/README.md) + - Personality: Adds autonomous, reactive behaviours (animations, sounds, LED reactions) to the robot. [README](src/modules/personality/README.md) + - PiServo: Controls the servos connected to the Raspberry Pi GPIO. [README](src/modules/actuators/piservo/README.md) + - PiTemperature: Reads the CPU temperature from the Raspberry Pi and throttles the system when needed. [README](src/modules/pitemperature/README.md) + - RTLSDR: Uses an RTL-SDR dongle to receive and process radio signals. [README](src/modules/network/rtlsdr/README.md) + - Servo (Arduino): Controls servos connected to the Arduino via the Raspberry Pi serial connection. [README](src/modules/actuators/servo/README.md) + - SpeechInput: Uses speech recognition to convert audio input to text. [README](src/modules/audio/speechinput/README.md) + - TelegramBot: Enables remote control and interaction via a Telegram bot. [README](src/modules/network/telegrambot/README.md) + - Tracking: Uses the IMX500 AI camera to track objects and faces. [README](src/modules/vision/imx500/tracking/README.md) + - Translator: Translates text between languages using the Google Translate API. [README](src/modules/translator/README.md) + - TTS: Converts text to speech using the onboard speaker. [README](src/modules/audio/ttsmodule/README.md) + - Vision (IMX500): Object detection using the Raspberry Pi AI camera (IMX500). [README](src/modules/vision/imx500/vision/README.md) + - Vision (OpenCV): Computer vision processing using OpenCV. [README](src/modules/vision/opencv/vision/README.md) + - XboxController: Reads and normalises input from an Xbox-compatible gamepad. [README](src/modules/xbox_controller/README.md) - [Read more](https://github.com/makerforgetech/modular-biped/wiki/Software#modules)! ## Project Background @@ -37,7 +52,7 @@ The open source framework is designed for flexibility, allowing users to easily ## Resources -- **Documentation**: For detailed information, visit the project’s GitHub wiki: [Modular Robot Documentation](https://github.com/makerforgetech/modular-biped/wiki) +- **Documentation**: For detailed information, visit the project's GitHub wiki: [Modular Robot Documentation](https://github.com/makerforgetech/modular-biped/wiki) - **Code**: Check out the modular open source software on [GitHub](https://github.com/makerforgetech/modular-biped) - **YouTube Playlist**: Explore the development process through our build videos: [Watch on YouTube](https://www.youtube.com/watch?v=2DVJ5xxAuWY&list=PL_ua9QbuRTv6Kh8hiEXXVqywS8pklZraT) - **Community**: Have a question or want to show off your build? Join the communities on [GitHub](https://bit.ly/maker-forge-community) and [Discord](https://bit.ly/makerforge-community)! @@ -60,7 +75,7 @@ messaging_service: enabled: true config: protocol: 'pubsub' # 'mqtt' or 'pubsub' - mqtt_host: 'localhost' + mqtt_host: 'localhost' mqtt_port: 1883 ``` @@ -69,29 +84,41 @@ The introduction of mqtt allows distributed communication between modules, even The methods publish() and subscribe() can be utilised from within any module to send and receive messages to topics. For example: - + ```python class MyModule(BaseModule): def __init__(self): # Don't subscribe here pass - + def setup_messaging(self): """Subscribe to necessary topics.""" self.subscribe('my_topic', self.my_callback) - + def my_callback(self, message): print(f'Received message: {message}') self.publish('my_response_topic', 'Hello from MyModule!') self.log(level='info', message='MyModule received a message!') ``` + +### The `loop()` Method + +Modules that need to perform work on every system cycle should override the `loop()` method on `BaseModule`. The main loop calls `loop()` directly on each loaded module — no pub/sub subscription to `system/loop` is required. + +```python +class MyModule(BaseModule): + def loop(self): + """Called every system loop cycle.""" + self.do_work() +``` + Core and common topics include: - `log` - Used for logging messages, accepts a string. or kwargs `type` (info by default) and `message`. - `log/info` - Used for logging informational messages. - `log/warning` - Used for logging warning messages. - `log/error` - Used for logging error messages. - `log/debug` - Used for logging debug messages. -- `log/critical` - Used for logging critical messages. +- `log/critical` - Used for logging critical messages. - `system/loop` - The main loop event. Subscribe to this for an action to trigger every loop. - `system/loop/1` - Triggers a loop every second. - `system/loop/10` - Triggers a loop every 10 seconds. @@ -105,6 +132,19 @@ Core and common topics include: - `vision/detections` - Output from the Vision module, containing detected objects. - `led` - Output to the Neopixel LED module. +### Servo Injection + +Modules that need to control servos directly can request injection via the environment YAML: + +```yaml +my_module: + enabled: true + inject: + servos: "Servo_*" +``` + +This makes servo instances available as `self.servos['servo_name']` within the module, allowing direct method calls such as `self.servos['neck_tilt'].move(pitch)`. + ## Logging An upgraded log manager has been included in this version. This allows logs to be published either via the above messaging service, or directly to a 'log' method within the BaseModule class. The log manager can be configured in the `logwrapper.yaml` file. @@ -163,16 +203,16 @@ INFO: 01/30/2025 12:05:28 PM [Main] Loop ended 7. To enable autolaunch on boot, run `./installers/autolaunch.sh enable ` (defaults to laptop if no environment specified). To disable autolaunch, run `./installers/autolaunch.sh disable`. # Adding New Modules -To add a new module, create a new directory in the `src/modules` directory with the name of your module. +To add a new module, create a new directory in the `src/modules` directory with the name of your module. -Inside this directory, create a Python file with the same name as the directory. This file should contain a class that extends the BaseModule class. +Inside this directory, create a Python file with the same name as the directory. This file should contain a class that extends the BaseModule class. -Implement the necessary methods and functionality for your module. +Implement the necessary methods and functionality for your module. - `__init__(self, config, messaging_service)`: Initialize your module and set up any necessary variables or connections. The `config` parameter contains the configuration for your module from the YAML - `setup_messaging(self)`: Set up any necessary subscriptions to topics using the `subscribe` method from the BaseModule class. You can subscribe to other topics or system/loop/ for timed loops. See the wiki for more details on the messaging service and available topics. -- `loop(self)`: Implement the looping functionality of your module in this method. This method will be called repeatedly in the main loop of the robot. -- `__exit__(self)`: Clean up any resources or connections when the module is stopped. +- `loop(self)`: Implement the looping functionality of your module in this method. This method will be called directly on every main loop cycle — no pub/sub subscription needed. See the wiki for more details. +- `__exit__(self)`: Clean up any resources or connections when the module is stopped. Create a configuration YAML file for your module in the new module's directory called `config.yml`. This file should include the class name of your module and any dependencies it requires, as well as any non-environment specific configuration. @@ -188,7 +228,7 @@ bno055: - adafruit-blinka ``` -Finally, add your module to the configuration YAML file for your environment to enable it. +Finally, add your module to the configuration YAML file for your environment to enable it. You may wish to include environment specific configuration in the environment YAML file, which will be passed to the module's init method via kwargs. @@ -210,4 +250,19 @@ bus_servo: id: 3 range: [0, 2063] range_degrees: 181.3 -``` \ No newline at end of file +``` + +## Testing + +All module tests can be run from the project root with: + +```bash +bash test.sh +``` + +Individual test suites can also be targeted: + +```bash +PYTHONPATH=src python3 -m unittest discover -s src/tests -p "test_*.py" -t src +PYTHONPATH=src python3 -m unittest discover -s src/modules -p "test_*.py" -t src +``` diff --git a/environments/cody.yml b/environments/cody.yml index cd2e9b17..7c5f220b 100644 --- a/environments/cody.yml +++ b/environments/cody.yml @@ -20,57 +20,51 @@ bus_servo: enabled: true config: poses: - - legs_forward: {'leg_r_tilt': 2806, 'leg_r_hip': 1989, 'leg_r_knee': 1349, 'leg_r_ankle': 2754, 'neck_pan': 1698, 'neck_tilt': 118, 'leg_l_tilt': 1619, 'leg_l_hip': 1028, 'leg_l_knee': 2634, 'leg_l_ankle': 1567} - - sit: {'leg_r_tilt': 2806, 'leg_r_hip': 2063, 'leg_r_knee': 1840, 'leg_r_ankle': 3641, 'neck_pan': 1697, 'neck_tilt': 118, 'leg_l_tilt': 1621, 'leg_l_hip': 853, 'leg_l_knee': 2161, 'leg_l_ankle': 614} - - wave_1: {'leg_r_tilt': 2809, 'leg_r_hip': 2063, 'leg_r_knee': 1840, 'leg_r_ankle': 3641, 'neck_pan': 1697, 'neck_tilt': 118, 'leg_l_tilt': 1618, 'leg_l_hip': 855, 'leg_l_knee': 2614, 'leg_l_ankle': 1337} - - wave_2: {'leg_r_tilt': 2807, 'leg_r_hip': 2063, 'leg_r_knee': 1840, 'leg_r_ankle': 3641, 'neck_pan': 1697, 'neck_tilt': 118, 'leg_l_tilt': 1618, 'leg_l_hip': 855, 'leg_l_knee': 2614, 'leg_l_ankle': 874} - - stand_low: {'leg_r_tilt': 2790, 'leg_r_hip': 422, 'leg_r_knee': 717, 'leg_r_ankle': 3174, 'neck_pan': 1697, 'neck_tilt': 119, 'leg_l_tilt': 1578, 'leg_l_hip': 2668, 'leg_l_knee': 3342, 'leg_l_ankle': 1107} - - stand_high: {'leg_r_tilt': 2790, 'leg_r_hip': 692, 'leg_r_knee': 1322, 'leg_r_ankle': 2861, 'neck_pan': 1700, 'neck_tilt': 123, 'leg_l_tilt': 1578, 'leg_l_hip': 2510, 'leg_l_knee': 2712, 'leg_l_ankle': 1560} - - sit_edge: {'leg_r_tilt': 2790, 'leg_r_hip': 1918, 'leg_r_knee': 2768, 'leg_r_ankle': 2854, 'neck_pan': 1704, 'neck_tilt': 124, 'leg_l_tilt': 1577, 'leg_l_hip': 1183, 'leg_l_knee': 1332, 'leg_l_ankle': 1562} - - sit_edge_swing_l: {'leg_r_tilt': 2790, 'leg_r_hip': 1918, 'leg_r_knee': 2768, 'leg_r_ankle': 2854, 'neck_pan': 1704, 'neck_tilt': 124, 'leg_l_tilt': 1577, 'leg_l_hip': 1185, 'leg_l_knee': 1789, 'leg_l_ankle': 1988} - - sit_edge_swing_r: {'leg_r_tilt': 2790, 'leg_r_hip': 1918, 'leg_r_knee': 2222, 'leg_r_ankle': 2496, 'neck_pan': 1704, 'neck_tilt': 124, 'leg_l_tilt': 1577, 'leg_l_hip': 1182, 'leg_l_knee': 1345, 'leg_l_ankle': 1511} + - legs_forward: {'leg_r_tilt': 246.7, 'leg_r_hip': 174.8, 'leg_r_knee': 118.6, 'leg_r_ankle': 242.0, 'neck_pan': 149.3, 'neck_tilt': 34.6, 'leg_l_tilt': 142.6, 'leg_l_hip': 90.2, 'leg_l_knee': 231.6, 'leg_l_ankle': 137.7} + - sit: {'leg_r_tilt': 246.7, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.8, 'leg_l_hip': 74.9, 'leg_l_knee': 189.7, 'leg_l_ankle': 53.9} + - wave_1: {'leg_r_tilt': 246.9, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.5, 'leg_l_hip': 75.1, 'leg_l_knee': 229.7, 'leg_l_ankle': 117.6} + - wave_2: {'leg_r_tilt': 246.8, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.5, 'leg_l_hip': 75.1, 'leg_l_knee': 229.7, 'leg_l_ankle': 76.5} + - stand_low: {'leg_r_tilt': 245.3, 'leg_r_hip': 37.1, 'leg_r_knee': 62.9, 'leg_r_ankle': 279.1, 'neck_pan': 149.2, 'neck_tilt': 35.0, 'leg_l_tilt': 138.8, 'leg_l_hip': 233.1, 'leg_l_knee': 277.2, 'leg_l_ankle': 96.2} + - stand_high: {'leg_r_tilt': 245.3, 'leg_r_hip': 60.8, 'leg_r_knee': 116.2, 'leg_r_ankle': 251.5, 'neck_pan': 149.5, 'neck_tilt': 36.1, 'leg_l_tilt': 138.8, 'leg_l_hip': 219.7, 'leg_l_knee': 225.7, 'leg_l_ankle': 135.2} + - sit_edge: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 243.5, 'leg_r_ankle': 251.0, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.7, 'leg_l_knee': 117.1, 'leg_l_ankle': 137.2} + - sit_edge_swing_l: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 243.5, 'leg_r_ankle': 251.0, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.9, 'leg_l_knee': 157.2, 'leg_l_ankle': 176.2} + - sit_edge_swing_r: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 195.3, 'leg_r_ankle': 220.2, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.6, 'leg_l_knee': 118.2, 'leg_l_ankle': 134.7} instances: - name: "leg_r_tilt" model: 'ST3215' id: 1 - range: [2511, 3944] # 1433 total range (4095 = 360 degrees) - range_degrees: 125.9 - start: 2810 + range: [220.9, 346.6] # converted from raw (4095 = 360 degrees) + start: 246.9 # calibrate_on_boot: true - name: "leg_r_hip" model: 'ST3215' id: 3 - range: [0, 2063] - range_degrees: 181.3 + range: [0.0, 181.3] # calibrate_on_boot: true - # start: 1688 + # start: 148.4 - name: "leg_r_knee" model: 'ST3215' id: 2 - range: [717, 3297] - range_degrees: 226.2 - # start: 1345 + range: [62.9, 295.7] + # start: 118.2 - name: "leg_r_ankle" model: 'ST3215' id: 6 - range: [2459, 3872] - range_degrees: 123.7 - # start: 2762 + range: [216.1, 340.5] + # start: 243.0 - name: "neck_pan" model: 'ST3215' id: 12 - range: [0, 3412] # 300 degrees = 3412, 360 degrees = 4095 - range_degrees: 299.7 - start: 1706 + range: [0.0, 299.1] # converted from raw (4095 = 360 degrees) + start: 149.9 # calibrate_on_boot: true # demonstrate_on_boot: true # - name: "neck_pan" # model: 'SC09' # baudrate: 115200 # id: 4 - # range: [0, 1023] - # range_degrees: 300 - # start: 511 + # range: [0.0, 300.0] + # start: 150.0 # calibrate_on_boot: true # demonstrate_on_boot: true # speed: 60 # tested up to 600 @@ -78,9 +72,8 @@ bus_servo: model: 'SC09' baudrate: 115200 id: 11 - range: [30, 220] - range_degrees: 55.6 - start: 120 + range: [8.8, 64.5] # converted from raw (1023 = 300 degrees) + start: 35.2 speed: 60 # max=0, 1 - 3000 tested # acceleration: 0 # max (no significant difference between this and 1 - 3000) # demonstrate_on_boot: true @@ -88,36 +81,32 @@ bus_servo: # model: 'SC09' # baudrate: 115200 # id: 5 - # range: [0, 610] # 180 degree (may need reducing) - # start: 305 + # range: [0.0, 179.0] # converted from raw (1023 = 300 degrees) + # start: 89.5 # speed: 300 # acceleration: 1 # demonstrate_on_boot: true - name: "leg_l_tilt" model: 'ST3215' id: 7 - range: [8, 1796] # 1788 total range (4095 = 360 degrees) - range_degrees: 156.9 - start: 1617 + range: [0.7, 157.8] # converted from raw (4095 = 360 degrees) + start: 142.1 # calibrate_on_boot: true - name: "leg_l_hip" model: 'ST3215' id: 8 - range: [853, 3098] - range_degrees: 197.3 - # start: 2451 + range: [74.9, 279.2] + # start: 215.4 - name: "leg_l_knee" model: 'ST3215' id: 9 - range: [1332, 3404] - range_degrees: 181.1 - # start: 2643 + range: [117.1, 299.3] + # start: 232.2 - name: "leg_l_ankle" model: 'ST3215' id: 10 - range: [614, 2270] - range_degrees: 145.2 - # start: 1566 + range: [53.9, 199.3] + # start: 137.6 # demonstrate_on_boot: true chatgpt: diff --git a/environments/laptop.yml b/environments/laptop.yml index e756f6f8..89517bc9 100644 --- a/environments/laptop.yml +++ b/environments/laptop.yml @@ -14,3 +14,98 @@ logwrapper: chatgpt: enabled: false + + +bus_servo: + enabled: true + config: + backend: simulation + poses: + - legs_forward: {'leg_r_tilt': 246.7, 'leg_r_hip': 174.8, 'leg_r_knee': 118.6, 'leg_r_ankle': 242.0, 'neck_pan': 149.3, 'neck_tilt': 34.6, 'leg_l_tilt': 142.6, 'leg_l_hip': 90.2, 'leg_l_knee': 231.6, 'leg_l_ankle': 137.7} + - sit: {'leg_r_tilt': 246.7, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.8, 'leg_l_hip': 74.9, 'leg_l_knee': 189.7, 'leg_l_ankle': 53.9} + - wave_1: {'leg_r_tilt': 246.9, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.5, 'leg_l_hip': 75.1, 'leg_l_knee': 229.7, 'leg_l_ankle': 117.6} + - wave_2: {'leg_r_tilt': 246.8, 'leg_r_hip': 181.3, 'leg_r_knee': 161.7, 'leg_r_ankle': 319.7, 'neck_pan': 149.2, 'neck_tilt': 34.6, 'leg_l_tilt': 142.5, 'leg_l_hip': 75.1, 'leg_l_knee': 229.7, 'leg_l_ankle': 76.5} + - stand_low: {'leg_r_tilt': 245.3, 'leg_r_hip': 37.1, 'leg_r_knee': 62.9, 'leg_r_ankle': 279.1, 'neck_pan': 149.2, 'neck_tilt': 35.0, 'leg_l_tilt': 138.8, 'leg_l_hip': 233.1, 'leg_l_knee': 277.2, 'leg_l_ankle': 96.2} + - stand_high: {'leg_r_tilt': 245.3, 'leg_r_hip': 60.8, 'leg_r_knee': 116.2, 'leg_r_ankle': 251.5, 'neck_pan': 149.5, 'neck_tilt': 36.1, 'leg_l_tilt': 138.8, 'leg_l_hip': 219.7, 'leg_l_knee': 225.7, 'leg_l_ankle': 135.2} + - sit_edge: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 243.5, 'leg_r_ankle': 251.0, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.7, 'leg_l_knee': 117.1, 'leg_l_ankle': 137.2} + - sit_edge_swing_l: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 243.5, 'leg_r_ankle': 251.0, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.9, 'leg_l_knee': 157.2, 'leg_l_ankle': 176.2} + - sit_edge_swing_r: {'leg_r_tilt': 245.3, 'leg_r_hip': 168.7, 'leg_r_knee': 195.3, 'leg_r_ankle': 220.2, 'neck_pan': 149.8, 'neck_tilt': 36.6, 'leg_l_tilt': 138.7, 'leg_l_hip': 101.6, 'leg_l_knee': 118.2, 'leg_l_ankle': 134.7} + instances: + - name: "leg_r_tilt" + model: 'ST3215' + id: 1 + range: [220.9, 346.6] # converted from raw (4095 = 360 degrees) + start: 246.9 + # calibrate_on_boot: true + - name: "leg_r_hip" + model: 'ST3215' + id: 3 + range: [0.0, 181.3] + # calibrate_on_boot: true + # start: 148.4 + - name: "leg_r_knee" + model: 'ST3215' + id: 2 + range: [62.9, 295.7] + # start: 118.2 + - name: "leg_r_ankle" + model: 'ST3215' + id: 6 + range: [216.1, 340.5] + # start: 243.0 + - name: "neck_pan" + model: 'ST3215' + id: 12 + range: [0.0, 299.1] # converted from raw (4095 = 360 degrees) + start: 149.9 + # calibrate_on_boot: true + # demonstrate_on_boot: true + # - name: "neck_pan" + # model: 'SC09' + # baudrate: 115200 + # id: 4 + # range: [0.0, 300.0] + # start: 150.0 + # calibrate_on_boot: true + # demonstrate_on_boot: true + # speed: 60 # tested up to 600 + - name: "neck_tilt" + model: 'SC09' + baudrate: 115200 + id: 11 + range: [8.8, 64.5] # converted from raw (1023 = 300 degrees) + start: 35.2 + speed: 60 # max=0, 1 - 3000 tested + # acceleration: 0 # max (no significant difference between this and 1 - 3000) + # demonstrate_on_boot: true + # - name: "neck_tilt2" + # model: 'SC09' + # baudrate: 115200 + # id: 5 + # range: [0.0, 179.0] # converted from raw (1023 = 300 degrees) + # start: 89.5 + # speed: 300 + # acceleration: 1 + # demonstrate_on_boot: true + - name: "leg_l_tilt" + model: 'ST3215' + id: 7 + range: [0.7, 157.8] # converted from raw (4095 = 360 degrees) + start: 142.1 + # calibrate_on_boot: true + - name: "leg_l_hip" + model: 'ST3215' + id: 8 + range: [74.9, 279.2] + # start: 215.4 + - name: "leg_l_knee" + model: 'ST3215' + id: 9 + range: [117.1, 299.3] + # start: 232.2 + - name: "leg_l_ankle" + model: 'ST3215' + id: 10 + range: [53.9, 199.3] + # start: 137.6 + # demonstrate_on_boot: true \ No newline at end of file diff --git a/src/modules/actuators/bus_servo/README.md b/src/modules/actuators/bus_servo/README.md index 79031884..632827f8 100644 --- a/src/modules/actuators/bus_servo/README.md +++ b/src/modules/actuators/bus_servo/README.md @@ -2,24 +2,34 @@ ## Overview -Serial bus servos allow for efficient communication and control of multiple servos over a single bus. The `Servo` class provides an interface to manage these servos, including setting positions, speeds, and handling configurations. +Serial bus servos allow for efficient communication and control of multiple servos over a single bus. The `Servo` class provides a high-level interface to manage these servos, including setting positions, speeds, and handling configurations. The hardware interface is now fully backend-agnostic, supporting multiple libraries and simulation. + +## Architecture + +The bus_servo module now supports multiple backends: + +- **Waveshare**: Uses the official Waveshare ST/SC SDKs for hardware control. +- **Rustypot**: Uses the rustypot library for Feetech and compatible servos. +- **Simulation**: A software-only backend for debugging and development, which logs all actions instead of controlling hardware. + +You can select the backend in your configuration. All backends implement the same interface, so you can switch between them without changing your code. + +All servo configuration (range, start, poses) is now in **degrees** for clarity and cross-backend compatibility. ## Configuration -The `config/servo_bus.yml` file contains the configuration for each bus servo. The `instances` section defines the servos, their IDs, and their initial positions. The `poses` section within each instance defines various poses that can be used to set servo positions. +The configuration file (e.g. `environments/cody.yml`) contains the configuration for each bus servo. The `instances` section defines the servos, their IDs, and their initial positions, all in degrees. The `poses` section defines named poses, also in degrees. ## Getting Started / Calibration To calibrate the servos, each must have their ID set individually. To achieve this, connect one servo to the driver board and run `modules/actuators/bus_servo/change_id.py`. This script will prompt you to enter the ID for the connected servo, which will then be saved permanently on the servo. -If you have challenges understanding the current ID of the servo, run `modules/actuators/bus_servo/STServo_examples/read_all.py` or `modules/actuators/bus_servo/SCServo_examples/read_all.py` depending on the servo type. This script will read and display the current ID of the connected servo. - -Note: if there is an issue with dependencies, you may need to run the script from within the `modules/actuators/bus_servo` directory. +If you have challenges understanding the current ID of the servo, run `modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_all.py` or `modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/read_all.py` depending on the servo type. This script will read and display the current ID of the connected servo. -Once the ID has been set for all servos, you can use the `Servo` class to control them by enabling it in the above yaml file. +Once the ID has been set for all servos, you can use the `Servo` class to control them by enabling it in the environment yaml file. ### Centering -The servos range is between 0-4095 for ST servos and 0-1024 for SC servos. The position readout can wrap around (from 0 to 4095), so the servos should be set to the midpoint before mounting them in the robot otherwise it can result in the servo passing the wrong way through the range of values, which could damage the robot. +The servos raw value range is between 0-4095 for ST servos and 0-1024 for SC servos, this equates to 360 and 300 degrees respectively. The position readout can wrap around, so the servos should be set to the midpoint before mounting them in the robot otherwise it can result in the servo passing the wrong way through the range of values, which could damage the robot. The `change_id.py` script will set the servo to the midpoint when changing the ID. The servo should then be mounted in the robot at roughly the midpoint position. @@ -27,13 +37,7 @@ You can also enable `center_on_boot` for each servo in the configuration file, w ### Calibration -To calibrate the servo positions, set the flag `calibrate_on_boot` to `true` in the configuration file for each instance (servo). This will cause the servo to output it's current position in the debug log, which can then be copied into the start position, or range. Servos can be manually moved to any position to identify their range or certain poses. - -Note: `calibrate_on_boot` is enabled by default to help with initial configuration. - -Once the start position and range are set, move the flag to the next servo and repeat the process until all servos are calibrated. - -You can then copy the output values directly from the log into the configuration file. +To calibrate the servo positions, set the flag `calibrate_on_boot` to `true` in the configuration file for each instance (servo). This will cause the servo to output it's current position in the debug log, which can then be copied into the start position, or range. Servos can be manually moved to any position to identify their range or certain poses. Finally, set `calibrate_on_boot` to false and re-run the program to start using the servos with their configured positions. @@ -41,32 +45,56 @@ Finally, set `calibrate_on_boot` to false and re-run the program to start using To demonstrate the servo movement on boot, set the flag `demonstrate_on_boot` to `true` in the configuration file for each instance (servo). This will cause the servo to move to its minimum and maximum positions once on initialization, allowing you to see the range of motion. -## Subscriptions +## Subscriptions and direct command The `Servo` class subscribes to the following topics: `servo::mv` - to move the servo to a specific position relative to it's current position. `servo::mvabs` - to move the servo to a specific position with absolute values. +The environment configuration can also inject the servos into a module for direct control: + +``` +my_module: + enabled: true + inject: + servos: "Servo_*" +``` + +You can then directly call the servo such as: + +``` +self.servos['neck_tilt'].move_relative(pitch) +self.servos['neck_tilt'].move(pitch) +``` + ## Smooth initialization -Because the `Servo` class gets the current position of the servo on initialization, there is no danger of a servo jumping from an unknown position to the start position. This is especially useful when the servos are powered on in a random position and is an advantage over hobby servos used in previous versions of the modular biped. +Because the `Servo` class gets the current position of the servo on initialization, there is no danger of a servo jumping from an unknown position to the start position. This is especially useful when the servos are powered on in a random position and is an advantage over hobby servos. ## SC vs ST servos -The `Servo` class supports both ST and SC series servos from Waveshare and compatible servos. The type of servo is determined by the `model` variable in the configuration file. The class will automatically use the appropriate SDK for the specified servo type. +The `Servo` class supports both ST and SC series servos from Waveshare, Feetech and compatible servos. The type of servo is determined by the `model` variable in the configuration file, and the backend is selected via the `backend` parameter. The class will automatically use the appropriate backend for the specified servo type. + +There are some limitations to the SC servos as they do not support continuous rotation and have a lower rotational range compared to the ST servos. The simulation backend is useful for development and debugging without hardware. + +## Backends + +- **WaveshareBusServo**: Uses the official Waveshare SDKs for ST/SC servos (this library is included in this repo). +- **RustypotBusServo**: Uses the rustypot library for Feetech and compatible servos (this library is referenced via python import). +- **SimulationBusServo**: Simulates servo behavior and logs all actions for debugging (no library required). -There are some limitations to the SC servos as they do not support continuous rotation and have a lower rotational range compared to the ST servos. +All backends implement the same methods, including movement, speed, torque, calibration, and error handling. ## Notes during testing - Speed: between 0 and 3000 for SC servos tested, 0 is max, then 1-3000 increases speed. - Acceleration: between 0 and 3000 for SC servos tested, 0 is max but no significant difference observed between values. -- Overload error occurring fairly frequently on SC servos. However when testing with scheduled movements the issue is far less frequent. This may be due to sending commands too quickly in succession. Cycling torque off/on seems to resolve the error, otherwise you need to power cycle the servo. +- Overload error occurring fairly frequently on SC servos using Waveshare library. However when testing with scheduled movements the issue is far less frequent. This may be due to sending commands too quickly in succession. Cycling torque off/on seems to resolve the error, otherwise you need to power cycle the servo. - To disable torque on SC servos use: `self.packetHandler.write1ByteTxRx(self.portHandler, self.index, ADDR_TORQUE_ENABLE, 0)`, change the 0 to 1 to enable torque. -- One of the SC servos definitely seems more prone to overload errors than the others. It may be a faulty servo. This servo also doesn't always respect speed settings. It can be set to 3000 and move quickly on one movement then slow the next without a change in speed being commanded. This issue may be directional, as both the original SC servos show this behaviour in one direction only. - In some cases the overload error does seem to be caused by load on the servo, in which case toggling the torque does not resolve this. The neck pan is an example of this where the SC servo needed to be replaced with the more powerful ST servo. ## References https://www.waveshare.com/wiki/ST3215_Servo https://www.waveshare.com/wiki/SC09_Servo -https://www.waveshare.com/wiki/Bus_Servo_Adapter_(A) \ No newline at end of file +https://www.waveshare.com/wiki/Bus_Servo_Adapter_(A) +https://github.com/pollen-robotics/rustypot \ No newline at end of file diff --git a/src/modules/actuators/bus_servo/bus_servo.py b/src/modules/actuators/bus_servo/bus_servo.py index fd53ab8f..b15d9a0c 100644 --- a/src/modules/actuators/bus_servo/bus_servo.py +++ b/src/modules/actuators/bus_servo/bus_servo.py @@ -4,51 +4,18 @@ import sys import select import time - -# if os.name == 'nt': -# import msvcrt -# def getch(): -# return msvcrt.getch().decode() - -# else: -# import sys, tty, termios -# fd = sys.stdin.fileno() -# old_settings = termios.tcgetattr(fd) -# def getch(): -# try: -# tty.setraw(sys.stdin.fileno()) -# ch = sys.stdin.read(1) -# finally: -# termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -# return ch - -# sys.path.append("..") -# Uses STServo and SCServo SDK library -from modules.actuators.bus_servo.STservo_sdk import PortHandler, sts, COMM_SUCCESS -from modules.actuators.bus_servo.SCservo_sdk import PacketHandler, SCS_LOWORD, SCS_HIWORD, SCS_TOHOST - from modules.base_module import BaseModule - -# Control table address -ADDR_TORQUE_ENABLE = 40 # Same address for both ST and SC servos -ADDR_SCS_GOAL_ACC = 41 -ADDR_SCS_GOAL_POSITION = 42 # Used in SCServo move -ADDR_SCS_GOAL_SPEED = 46 -ADDR_SCS_PRESENT_POSITION = 56 - -ST_MAX = 4095 -SC_MAX = 1024 - +from modules.actuators.bus_servo.libraries.factory import BusServoFactory class Servo(BaseModule): def __init__(self, **kwargs): """ Servo class """ + self.backend = kwargs.get('backend', 'waveshare') self.identifier = kwargs.get('name') self.model = kwargs.get('model', 'ST') self.index = kwargs.get('id') self.range = kwargs.get('range') - self.range_degrees = kwargs.get('range_degrees', None) # Optional range in degrees for easier control self.start = kwargs.get('start') # Default start position self.poses = kwargs.get('poses') # Dictionary of poses self.baudrate = kwargs.get('baudrate', 1000000) @@ -60,48 +27,27 @@ def __init__(self, **kwargs): self.speed = kwargs.get('speed', 300) # 3073 self.acceleration = kwargs.get('acceleration', 50) self._move_queue = collections.deque() - # After loading YAML: poses_list = kwargs.get('poses', []) - # Convert to dict: self.poses = {list(pose.keys())[0]: list(pose.values())[0] for pose in poses_list} - # Initialize PortHandler instance - # Set the port path - # Get methods and members of PortHandlerLinux or PortHandlerWindows - self.portHandler = PortHandler(self.port) + # Backend selection - # if model starts with ST or SC, use the appropriate packet handler - if self.model.startswith('ST'): - self.packetHandler = sts(self.portHandler) - elif self.model.startswith('SC'): - self.packetHandler = PacketHandler(1) # 1 = protocol_end in examples - else: - raise ValueError(f"Unknown servo model: {self.model}. Supported models are ST and SC.") - - # Open port - if not self.portHandler.openPort(): - raise Exception("Failed to open the port") - - # Set port baudrate - if not self.portHandler.setBaudRate(self.baudrate): - raise Exception("Failed to change the baudrate") + self.backend_servo = BusServoFactory.create( + backend=self.backend, + model=self.model, + servo_id=self.index, + port=self.port, + baudrate=self.baudrate, + range=self.range, + ) def detach(self): - # Detach servo - if self.model.startswith('ST'): - # Disable torque for STServo - sts_comm_result, sts_error = self.packetHandler.write1ByteTxRx(self.index, ADDR_TORQUE_ENABLE, 0) - if not self.handle_errors(sts_comm_result, sts_error): - self.log(f"ST Servo {self.identifier} disabled") - else: - # Disable torque for SCServo - scs_comm_result, scs_error = self.packetHandler.write1ByteTxRx(self.portHandler, self.index, ADDR_TORQUE_ENABLE, 0) - if not self.handle_errors(scs_comm_result, scs_error): - self.log(f"SC Servo {self.identifier} disabled") + # Detach servo using backend + self.backend_servo.detach() def exit(self): self.detach() - self.portHandler.closePort() + self.backend_servo.exit() def setup_messaging(self): self.subscribe('servo:' + self.identifier + ':mvabs', self.move) @@ -144,51 +90,6 @@ def move_to_pose(self, pose_name): else: self.log(f"Pose '{pose_name}' not found for servo {self.identifier}", level='warning') - def _sc_write(self, type, value, verbose=False): - comm_result, error = self.packetHandler.write2ByteTxRx(self.portHandler, self.index, type, value) - if hasattr(self.packetHandler, 'getTxRxResult') and verbose: - self.log(f"[SCServo Result] {self.packetHandler.getTxRxResult(comm_result)}") - if hasattr(self.packetHandler, 'getRxPacketError') and error != 0: - self.log(f"[SCServo Error] {self.identifier} {self.packetHandler.getRxPacketError(error)}") - # Attempting to clear error by toggling torque off and on - comm_result, error = self.packetHandler.write1ByteTxRx(self.portHandler, self.index, ADDR_TORQUE_ENABLE, 0) - time.sleep(0.1) - # comm_result, error = self.packetHandler.write1ByteTxRx(self.portHandler, self.index, ADDR_TORQUE_ENABLE, ) - # self._sc_write(type, value) - # # Typical address for Present Load is 54 (1 byte), but check your servo's manual - # ADDR_SCS_PRESENT_LOAD = 54 - # ADDR_SCS_PRESENT_POSITION = 56 - # ADDR_SCS_PRESENT_SPEED = 58 - # # Read present load - # load, load_comm_result, load_error = self.packetHandler.read1ByteTxRx(self.portHandler, self.index, ADDR_SCS_PRESENT_LOAD) - # # Read present position (2 bytes) - # pos, pos_comm_result, pos_error = self.packetHandler.read2ByteTxRx(self.portHandler, self.index, ADDR_SCS_PRESENT_POSITION) - # # Read present speed (2 bytes) - # speed, speed_comm_result, speed_error = self.packetHandler.read2ByteTxRx(self.portHandler, self.index, ADDR_SCS_PRESENT_SPEED) - # self.log(f"[SCServo][{self.identifier}] Error context: load={load} (comm_result={load_comm_result}, error={load_error}), position={pos} (comm_result={pos_comm_result}, error={pos_error}), speed={speed} (comm_result={speed_comm_result}, error={speed_error})") - return comm_result == COMM_SUCCESS and error == 0 - - def move_degrees(self, degrees): - if self.range_degrees is None: - self.log(f"Servo {self.identifier} does not have range_degrees set, cannot move by degrees", level='error') - return - # Convert degrees to position value based on range, adjusting RELATIVE to current position - self.pos = self.get_position() # Update current position before calculating new position - if self.range is not None and self.pos is not None: - # Calculate how many position units correspond to the degree change - units_per_degree = (self.range[1] - self.range[0]) / self.range_degrees - position_delta = degrees * units_per_degree - new_position = round(self.pos + position_delta) - # Clamp to range - if new_position < self.range[0]: - new_position = self.range[0] - elif new_position > self.range[1]: - new_position = self.range[1] - if self.range_degrees > 0: - pc_move = round((degrees / self.range_degrees) * 100) - self.log(f"Moving servo {self.identifier} by {degrees} degrees (position {self.pos} -> {new_position} | {pc_move}% of range)") - self.move(new_position) - def move(self, position, speed=None, acceleration=None, delay=0, **kwargs): """ Add a move request to the queue. @@ -233,31 +134,17 @@ def _do_move(self, position, speed=None, acceleration=None): if position is None: self.log(f"Position is None for servo {self.identifier}, cannot move", level='error') return - - speed = speed if speed is not None else self.speed - acceleration = acceleration if acceleration is not None else self.acceleration - - # self.log(f"(MOVE) Moving servo {self.identifier} from {self.pos} to position {position} for range {self.range}") if position < self.range[0] or position > self.range[1]: self.log(f"Position {position} out of range ({self.range[0]}-{self.range[1]})", level='error') return - - # Write STServo goal position - if self.model.startswith('ST'): - sts_comm_result, sts_error = self.packetHandler.WritePosEx(self.index, position, speed, acceleration) - if not self.handle_errors(sts_comm_result, sts_error): - self.log(f"Moved ST servo {self.identifier} from {self.pos} to position {position}") - self.pos = position # Update current position - elif self.model.startswith('SC'): - if ( - self._sc_write(ADDR_TORQUE_ENABLE, 1) and - self._sc_write(ADDR_SCS_GOAL_ACC, acceleration) and - self._sc_write(ADDR_SCS_GOAL_SPEED, speed) and - self._sc_write(ADDR_SCS_GOAL_POSITION, position)): - self.log(f"Moved SC servo {self.identifier} from {self.pos} to position {position} in range {self.range} at speed {speed} and acceleration {acceleration} ") - self.pos = position # Update current position - else: - self.log(f"Failed to move SC servo {self.identifier} to position {position}", level='error') + # Apply per-move speed/acceleration overrides if supported by backend + if speed is not None and hasattr(self.backend_servo, 'set_speed'): + self.backend_servo.set_speed(speed) + if acceleration is not None and hasattr(self.backend_servo, 'set_acceleration'): + self.backend_servo.set_acceleration(acceleration) + # Delegate to backend + self.backend_servo.move_to(position, unit='degrees') + self.pos = position def move_relative(self, delta): """ @@ -275,9 +162,9 @@ def move_relative(self, delta): def is_moving(self): - if self.get_moving() == 1: + if self.backend_servo.get_moving() == 1: return True - elif abs(self.pos - self.get_position()) > 15: + elif abs(self.pos - self.get_position()) > 2: print(f"Warning: Servo {self.identifier} is not reporting as moving but position {self.get_position()} does not match target position {self.pos}") return False @@ -285,85 +172,7 @@ def get_position(self): """ Get the current position of the servo. """ - if self.model.startswith('ST'): - # Read STServo present position - sts_present_position, sts_present_speed, sts_comm_result, sts_error = self.packetHandler.ReadPosSpeed(self.index) - if not self.handle_errors(sts_comm_result, sts_error): - # self.log("[ID:%03d] PresPos:%d PresSpd:%d" % (self.index, sts_present_position, sts_present_speed)) - return sts_present_position - else: - return self.sc_get_position_speed('position') - - def get_speed(self): - """ - Get the current speed of the servo. - """ - if self.model.startswith('ST'): - # Read STServo present position - sts_present_position, sts_present_speed, sts_comm_result, sts_error = self.packetHandler.ReadPosSpeed(self.index) - if not self.handle_errors(sts_comm_result, sts_error): - return sts_present_speed - else: - return self.sc_get_position_speed('speed') - - def get_moving(self): - """ - Get the current moving status of the servo. - """ - if self.model.startswith('ST'): - # Read STServo moving status - moving, sts_comm_result, sts_error = self.packetHandler.ReadMoving(self.index) - if not self.handle_errors(sts_comm_result, sts_error): - return moving - else: - # SCServo does not have a direct moving status, so we can infer it by checking if speed is non-zero or if position is changing over time. - speed = self.get_speed() - return speed != 0 - - - def sc_get_position_speed(self, pos_or_speed): - # Read SCServo present position - scs_present_position_speed, scs_comm_result, scs_error = self.packetHandler.read4ByteTxRx(self.portHandler, self.index, ADDR_SCS_PRESENT_POSITION) - if not self.handle_errors(scs_comm_result, scs_error): - scs_present_position = SCS_LOWORD(scs_present_position_speed) - scs_present_speed = SCS_HIWORD(scs_present_position_speed) - if pos_or_speed == 'position': - return scs_present_position - elif pos_or_speed == 'speed': - return SCS_TOHOST(scs_present_speed, 15) - - def enable_continuous(self): - """ - Set the servo mode. - :param mode: Mode to set (e.g., 'wheel', 'position') - """ - if self.model.startswith('SC'): - raise ValueError("Continuous mode is not supported for SCServo models.") - sts_comm_result, sts_error = self.packetHandler.WheelMode(self.index) - if not self.handle_errors(sts_comm_result, sts_error): - self.log(f"Servo {self.name} set to wheel mode") - - def turn_wheel(self, speed): - if self.model.startswith('SC'): - raise ValueError("Continuous mode is not supported for SCServo models.") - sts_comm_result, sts_error = self.packetHandler.WriteSpec(self.index, speed, self.acceleration) - if not self.handle_errors(sts_comm_result, sts_error): - self.log(f"Servo {self.name} turned at speed {speed}") - - def handle_errors(self, comm_result, error): - """ - Handle communication errors. - :param comm_result: Communication result - :param error: Error code - """ - if comm_result != COMM_SUCCESS: - self.log("%s" % self.packetHandler.getTxRxResult(comm_result), level='error') - # log stack trace for debugging - return True - if error != 0: - self.log("%s" % self.packetHandler.getRxPacketError(error), level='error') - return True - return False + return self.backend_servo.get_position(unit='degrees') def get_pose_value(self, pose_name): """ @@ -371,10 +180,13 @@ def get_pose_value(self, pose_name): """ if not self.poses: return None - for pose in self.poses: - if pose_name in pose: - return pose[pose_name] - return None # or raise an exception if preferred + return self.poses.get(pose_name) + + def calibrate_to_center(self): + """ + Move the servo to the center of its range using the backend implementation. + """ + self.backend_servo.calibrate_to_center() def calibrate(self): """ @@ -416,8 +228,8 @@ def calibrate_dynamic(self): max_pos = pos # Print on the same line, pad with spaces to clear previous content # (4095 = 360 degrees, so 1264 = 111 degrees) - range_degrees = (360/4095)*(max_pos - min_pos) if min_pos is not None and max_pos is not None else 'N/A' - print(f"\rCurrent position: {pos}, Min: {min_pos}, Max: {max_pos} Range(deg): {range_degrees}", end='', flush=True) + range_degrees = max_pos - min_pos if min_pos is not None and max_pos is not None else 'N/A' + print(f"\rCurrent position: {pos}, Min: {min_pos}, Max: {max_pos} Range: {range_degrees}", end='', flush=True) time.sleep(0.05) if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: sys.stdin.read(1) # Consume the key so buffer is cleared @@ -436,24 +248,6 @@ def calibrate_dynamic(self): self.start = (min_pos + max_pos) // 2 self.log(f"Start position {self.start} out of new range, setting to midpoint {self.start}") - def calibrate_to_center(self): - """ - Move the servo to the center of its range. - """ - # Write STServo goal position - - if self.model.startswith('ST'): - self.pos = (self.range[0] + self.range[1]) // 2 # Update current position to center - sts_comm_result, sts_error = self.packetHandler.WritePosEx(self.index, self.pos, self.speed, self.acceleration) - if not self.handle_errors(sts_comm_result, sts_error): - self.log(f"Moved servo {self.identifier} to position {self.pos}") - - elif self.model.startswith('SC'): - self.pos = (self.range[0] + self.range[1]) // 2 # Update current position to center - self.packetHandler.write1ByteTxRx(self.portHandler, self.index, ADDR_SCS_GOAL_ACC, self.acceleration) - self.packetHandler.write2ByteTxRx(self.portHandler, self.index, ADDR_SCS_GOAL_SPEED, self.speed) - self.packetHandler.write2ByteTxRx(self.portHandler, self.index, ADDR_SCS_GOAL_POSITION, self.pos) - self.log(f"Moved servo {self.identifier} to position {self.pos}") - + diff --git a/src/modules/actuators/bus_servo/config.yml b/src/modules/actuators/bus_servo/config.yml index 0f0a3ba9..83efef60 100644 --- a/src/modules/actuators/bus_servo/config.yml +++ b/src/modules/actuators/bus_servo/config.yml @@ -1,5 +1,8 @@ bus_servo: class: Servo dependencies: + python: + - rustypot + - numpy unix: - python3-serial diff --git a/src/modules/actuators/bus_servo/libraries/bus_servo_base.py b/src/modules/actuators/bus_servo/libraries/bus_servo_base.py new file mode 100644 index 00000000..87339ed1 --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/bus_servo_base.py @@ -0,0 +1,81 @@ +from abc import ABC, abstractmethod + +class BusServoBase(ABC): + def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): + self.servo_id = servo_id + self.model = model # 'ST' or 'SC' + self.port = port + self.baudrate = baudrate + self.range = range + self.range_degrees = range_degrees + self.log = kwargs.get('log', lambda msg, **kw: print(msg)) + + @abstractmethod + def get_speed(self, unit='degrees'): + """Get the current speed of the servo in the specified unit.""" + pass + + @abstractmethod + def get_moving(self): + """Get the current moving status of the servo.""" + pass + + @abstractmethod + def enable_continuous(self): + """Set the servo to continuous (wheel) mode if supported.""" + pass + + @abstractmethod + def turn_wheel(self, speed): + """Turn the servo in wheel mode at the given speed.""" + pass + + @abstractmethod + def handle_errors(self, comm_result, error): + """Handle communication errors.""" + pass + + @abstractmethod + def calibrate_to_center(self): + """Move the servo to the center of its range.""" + pass + + @abstractmethod + def move_to(self, value, unit='degrees'): + """Move servo to position in specified unit ('degrees', 'radians', 'raw').""" + pass + + @abstractmethod + def get_position(self, unit='degrees'): + """Get current position in specified unit.""" + pass + + @abstractmethod + def set_speed(self, value, unit='degrees'): + """Set speed in specified unit.""" + pass + + @abstractmethod + def detach(self): + """Disable torque.""" + pass + + @abstractmethod + def attach(self): + """Enable torque.""" + pass + + @abstractmethod + def exit(self): + """Cleanup resources.""" + pass + + @abstractmethod + def move_to_raw(self, raw_value): + """Move servo to raw position value.""" + pass + + @abstractmethod + def get_position_raw(self): + """Get current position as raw value.""" + pass diff --git a/src/modules/actuators/bus_servo/libraries/factory.py b/src/modules/actuators/bus_servo/libraries/factory.py new file mode 100644 index 00000000..447979de --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/factory.py @@ -0,0 +1,16 @@ +from .bus_servo_base import BusServoBase + +class BusServoFactory: + @staticmethod + def create(backend, model, servo_id, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): + if backend == 'waveshare': + from .waveshare_backend import WaveshareBusServo + return WaveshareBusServo(servo_id, model, port, baudrate, range, range_degrees, **kwargs) + elif backend == 'rustypot': + from .rustypot_backend import RustypotBusServo + return RustypotBusServo(servo_id, model, port, baudrate, range, range_degrees, **kwargs) + elif backend == 'simulation': + from .simulation_backend import SimulationBusServo + return SimulationBusServo(servo_id, model, port, baudrate, range, range_degrees, **kwargs) + else: + raise ValueError(f"Unknown backend: {backend}") diff --git a/src/modules/actuators/bus_servo/libraries/rustypot_backend.py b/src/modules/actuators/bus_servo/libraries/rustypot_backend.py new file mode 100644 index 00000000..78cff81d --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/rustypot_backend.py @@ -0,0 +1,101 @@ +from .bus_servo_base import BusServoBase +from .utils import degrees_to_radians, radians_to_degrees +import numpy as np + +class RustypotBusServo(BusServoBase): + + def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): + super().__init__(servo_id, model, port, baudrate, range, **kwargs) + if model.startswith('ST'): + from rustypot import Sts3215PyController + self.controller = Sts3215PyController(port, baudrate, 0.1) + self.max_rad = np.deg2rad(360) + self.min_rad = 0 + elif model.startswith('SC'): + from rustypot import Scs0009PyController + self.controller = Scs0009PyController(port, baudrate, 0.1) + self.max_rad = np.deg2rad(300) + self.min_rad = 0 + else: + raise ValueError(f"Unknown model: {model}") + + def get_speed(self, unit='degrees'): + rad_s = self.controller.read_present_speed(self.servo_id) + if unit == 'degrees': + import numpy as np + return np.rad2deg(rad_s) + elif unit == 'radians': + return rad_s + elif unit == 'raw': + return rad_s # Not directly supported + else: + raise ValueError(f"Unknown unit: {unit}") + + def get_moving(self): + # Rustypot does not provide a direct moving status; infer from speed + return self.get_speed(unit='radians') != 0 + + def enable_continuous(self): + # Not supported in rustypot for these servos + raise NotImplementedError("Continuous mode not supported in rustypot backend.") + + def turn_wheel(self, speed): + # Not supported in rustypot for these servos + raise NotImplementedError("Wheel mode not supported in rustypot backend.") + + def handle_errors(self, comm_result, error): + # Rustypot raises exceptions on error, so just return False for compatibility + return False + + def calibrate_to_center(self): + # Move to the center of the configured range + if self.range: + center = (self.range[0] + self.range[1]) / 2 + self.move_to(center, unit='degrees') + else: + raise ValueError("Range not set for this servo.") + + def move_to(self, value, unit='degrees'): + if unit == 'degrees': + rad = np.deg2rad(value) + elif unit == 'radians': + rad = value + elif unit == 'raw': + # For rustypot, raw is not directly supported; treat as radians in range + rad = value + else: + raise ValueError(f"Unknown unit: {unit}") + self.controller.write_goal_position(self.servo_id, rad) + + def get_position(self, unit='degrees'): + rad = self.controller.read_present_position(self.servo_id) + if unit == 'degrees': + return np.rad2deg(rad) + elif unit == 'radians': + return rad + elif unit == 'raw': + # Not directly supported; return radians as raw + return rad + else: + raise ValueError(f"Unknown unit: {unit}") + + def set_speed(self, value, unit='degrees'): + # Implement as needed, similar to move_to + pass + + def detach(self): + self.controller.write_torque_enable(self.servo_id, False) + + def attach(self): + self.controller.write_torque_enable(self.servo_id, True) + + def exit(self): + self.detach() + + def move_to_raw(self, raw_value): + # Not directly supported; treat as radians + self.controller.write_goal_position(self.servo_id, raw_value) + + def get_position_raw(self): + # Not directly supported; return radians + return self.controller.read_present_position(self.servo_id) diff --git a/src/modules/actuators/bus_servo/libraries/simulation_backend.py b/src/modules/actuators/bus_servo/libraries/simulation_backend.py new file mode 100644 index 00000000..a8edfa99 --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/simulation_backend.py @@ -0,0 +1,78 @@ +from .bus_servo_base import BusServoBase +import time + +class SimulationBusServo(BusServoBase): + def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): + super().__init__(servo_id, model, port, baudrate, range, range_degrees, **kwargs) + self.position = (range[0] + range[1]) / 2 if range else 0 + self.speed = 0 + self.torque_enabled = True + self.is_moving = False + self.log = kwargs.get('log', print) # Use BaseModule.log if provided, else print + + def move_to(self, value, unit='degrees'): + self.log(f"[SIM] Moving servo {self.servo_id} to {value} {unit}") + self.position = value + self.is_moving = True + time.sleep(0.01) + self.is_moving = False + + def get_position(self, unit='degrees'): + self.log(f"[SIM] Getting position of servo {self.servo_id} in {unit}") + return self.position + + def set_speed(self, value, unit='degrees'): + self.log(f"[SIM] Setting speed of servo {self.servo_id} to {value} {unit}") + self.speed = value + + def detach(self): + self.log(f"[SIM] Detaching (disabling torque) servo {self.servo_id}") + self.torque_enabled = False + + def attach(self): + self.log(f"[SIM] Attaching (enabling torque) servo {self.servo_id}") + self.torque_enabled = True + + def exit(self): + self.log(f"[SIM] Exiting simulation for servo {self.servo_id}") + + def move_to_raw(self, raw_value): + self.log(f"[SIM] Moving servo {self.servo_id} to raw value {raw_value}") + self.position = raw_value + self.is_moving = True + time.sleep(0.01) + self.is_moving = False + + def get_position_raw(self): + self.log(f"[SIM] Getting raw position of servo {self.servo_id}") + return self.position + + def get_speed(self, unit='degrees'): + self.log(f"[SIM] Getting speed of servo {self.servo_id} in {unit}") + return self.speed + + def get_moving(self): + self.log(f"[SIM] Checking if servo {self.servo_id} is moving") + return self.is_moving + + def enable_continuous(self): + self.log(f"[SIM] Enabling continuous mode for servo {self.servo_id}") + + def turn_wheel(self, speed): + self.log(f"[SIM] Turning wheel of servo {self.servo_id} at speed {speed}") + self.speed = speed + self.is_moving = True + time.sleep(0.01) + self.is_moving = False + + def handle_errors(self, comm_result, error): + self.log(f"[SIM] Handling errors for servo {self.servo_id}: comm_result={comm_result}, error={error}") + return False + + def calibrate_to_center(self): + if self.range: + center = (self.range[0] + self.range[1]) / 2 + self.log(f"[SIM] Calibrating servo {self.servo_id} to center position {center}") + self.position = center + else: + self.log(f"[SIM] No range set for servo {self.servo_id}, cannot calibrate to center.") diff --git a/src/modules/actuators/bus_servo/libraries/utils.py b/src/modules/actuators/bus_servo/libraries/utils.py new file mode 100644 index 00000000..051845e4 --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/utils.py @@ -0,0 +1,23 @@ +import math + +def degrees_to_radians(deg): + return math.radians(deg) + +def radians_to_degrees(rad): + return math.degrees(rad) + +def degrees_to_raw(deg, min_deg, max_deg, min_raw, max_raw): + # Linear mapping from degrees to raw units + return int((deg - min_deg) * (max_raw - min_raw) / (max_deg - min_deg) + min_raw) + +def raw_to_degrees(raw, min_deg, max_deg, min_raw, max_raw): + # Linear mapping from raw units to degrees + return (raw - min_raw) * (max_deg - min_deg) / (max_raw - min_raw) + min_deg + +def radians_to_raw(rad, min_rad, max_rad, min_raw, max_raw): + # Linear mapping from radians to raw units + return int((rad - min_rad) * (max_raw - min_raw) / (max_rad - min_rad) + min_raw) + +def raw_to_radians(raw, min_rad, max_rad, min_raw, max_raw): + # Linear mapping from raw units to radians + return (raw - min_raw) * (max_rad - min_rad) / (max_raw - min_raw) + min_rad diff --git a/src/modules/actuators/bus_servo/SCServo_examples/ping.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/ping.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/ping.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/ping.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/read_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/read_write.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/read_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/read_write.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/__init__.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/__init__.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/__init__.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/__init__.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/group_sync_read.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_read.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/group_sync_read.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_read.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/group_sync_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_write.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/group_sync_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/group_sync_write.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/packet_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/packet_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/packet_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/packet_handler.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/port_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/port_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/port_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/port_handler.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/protocol_packet_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/protocol_packet_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/protocol_packet_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/protocol_packet_handler.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/scservo_def.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/scservo_def.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/scservo_sdk/scservo_def.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/scservo_sdk/scservo_def.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/sync_read_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_read_write.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/sync_read_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_read_write.py diff --git a/src/modules/actuators/bus_servo/SCServo_examples/sync_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_write.py similarity index 100% rename from src/modules/actuators/bus_servo/SCServo_examples/sync_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCServo_examples/sync_write.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/__init__.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/__init__.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/__init__.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/__init__.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/group_sync_read.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_read.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/group_sync_read.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_read.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/group_sync_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_write.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/group_sync_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/group_sync_write.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/packet_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/packet_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/packet_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/packet_handler.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/port_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/port_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/port_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/port_handler.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/protocol_packet_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/protocol_packet_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/protocol_packet_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/protocol_packet_handler.py diff --git a/src/modules/actuators/bus_servo/SCservo_sdk/scservo_def.py b/src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/scservo_def.py similarity index 100% rename from src/modules/actuators/bus_servo/SCservo_sdk/scservo_def.py rename to src/modules/actuators/bus_servo/libraries/waveshare/SCservo_sdk/scservo_def.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/change_id.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/change_id.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/change_id.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/change_id.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/ping.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/ping.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/ping.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/ping.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/read.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/read.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/read_all.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_all.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/read_all.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_all.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/read_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_write.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/read_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/read_write.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/reg_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/reg_write.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/reg_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/reg_write.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/sync_read.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/sync_read.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/sync_read_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read_write.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/sync_read_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_read_write.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/sync_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_write.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/sync_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/sync_write.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/wheel.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/wheel.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/wheel.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/wheel.py diff --git a/src/modules/actuators/bus_servo/STServo_examples/write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/write.py similarity index 100% rename from src/modules/actuators/bus_servo/STServo_examples/write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STServo_examples/write.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/__init__.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/__init__.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/__init__.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/__init__.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/group_sync_read.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_read.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/group_sync_read.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_read.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/group_sync_write.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_write.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/group_sync_write.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/group_sync_write.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/port_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/port_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/port_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/port_handler.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/protocol_packet_handler.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/protocol_packet_handler.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/protocol_packet_handler.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/protocol_packet_handler.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/scscl.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/scscl.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/scscl.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/scscl.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/sts.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/sts.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/sts.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/sts.py diff --git a/src/modules/actuators/bus_servo/STservo_sdk/stservo_def.py b/src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/stservo_def.py similarity index 100% rename from src/modules/actuators/bus_servo/STservo_sdk/stservo_def.py rename to src/modules/actuators/bus_servo/libraries/waveshare/STservo_sdk/stservo_def.py diff --git a/src/modules/actuators/bus_servo/libraries/waveshare/__init__.py b/src/modules/actuators/bus_servo/libraries/waveshare/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/modules/actuators/bus_servo/libraries/waveshare_backend.py b/src/modules/actuators/bus_servo/libraries/waveshare_backend.py new file mode 100644 index 00000000..832b2ed7 --- /dev/null +++ b/src/modules/actuators/bus_servo/libraries/waveshare_backend.py @@ -0,0 +1,265 @@ +from .bus_servo_base import BusServoBase +from .utils import degrees_to_radians, radians_to_degrees, degrees_to_raw, raw_to_degrees, radians_to_raw, raw_to_radians +import math +import threading + +# Control table address +ADDR_TORQUE_ENABLE = 40 # Same address for both ST and SC servos +ADDR_SCS_GOAL_ACC = 41 +ADDR_SCS_GOAL_POSITION = 42 # Used in SCServo move +ADDR_SCS_GOAL_SPEED = 46 +ADDR_SCS_PRESENT_POSITION = 56 + +ST_MAX = 4095 +SC_MAX = 1024 + +COMM_SUCCESS = 0 # Communication success (matches value in both ST and SC SDKs) + + +class _WaveshareConnectionManager: + """Shared connection pool for Waveshare servo bus connections. + + Multiple WaveshareBusServo instances on the same serial port, baudrate, and + protocol share a single PortHandler/packetHandler so that writes to the bus + are serialized through one connection rather than opening the port multiple + times. The underlying port is only closed when the last reference is + released via release(). + """ + + _lock = threading.Lock() + _connections = {} # key: (port, baudrate, protocol) -> dict + + @classmethod + def acquire(cls, port, baudrate, protocol, create_connection): + """Return the shared (portHandler, packetHandler) for the given bus. + + :param port: Serial port path (e.g. '/dev/ttyAMA0') + :param baudrate: Baud rate integer + :param protocol: 'ST' or 'SC' – determines which SDK is in use + :param create_connection: Zero-arg callable that opens the port and + returns ``(portHandler, packetHandler)``. Only invoked when no + existing connection is found for the key. + :returns: ``(portHandler, packetHandler)`` + """ + key = (port, baudrate, protocol) + with cls._lock: + if key not in cls._connections: + portHandler, packetHandler = create_connection() + cls._connections[key] = { + 'portHandler': portHandler, + 'packetHandler': packetHandler, + 'ref_count': 0, + } + cls._connections[key]['ref_count'] += 1 + conn = cls._connections[key] + return conn['portHandler'], conn['packetHandler'] + + @classmethod + def release(cls, port, baudrate, protocol): + """Decrement the reference count for the given bus connection. + + When the count reaches zero the port is closed and the entry is removed. + """ + key = (port, baudrate, protocol) + with cls._lock: + if key not in cls._connections: + return + cls._connections[key]['ref_count'] -= 1 + if cls._connections[key]['ref_count'] <= 0: + try: + cls._connections[key]['portHandler'].closePort() + except OSError: + pass + del cls._connections[key] + + @classmethod + def reset(cls): + """Remove all tracked connections without closing ports. + + Intended for use in tests to reset state between test cases. + """ + with cls._lock: + cls._connections.clear() + + +class WaveshareBusServo(BusServoBase): + def __init__(self, servo_id, model, port, baudrate=1000000, range=None, range_degrees=None, **kwargs): + super().__init__(servo_id, model, port, baudrate, range, **kwargs) + self.speed = kwargs.get('speed', 300) + self.acceleration = kwargs.get('acceleration', 50) + # Determine protocol and raw/degree limits from model, then acquire or + # reuse the shared connection for this (port, baudrate, protocol). + if model.startswith('ST'): + from .waveshare.STservo_sdk import PortHandler, sts + self._protocol = 'ST' + self.max_raw = 4095 + self.min_raw = 0 + self.max_deg = 360 + self.min_deg = 0 + def _create(): + ph = PortHandler(port) + if not ph.openPort(): + raise RuntimeError(f"Failed to open port {port} for servo {servo_id}") + if not ph.setBaudRate(baudrate): + raise RuntimeError(f"Failed to set baudrate {baudrate} for servo {servo_id}") + return ph, sts(ph) + elif model.startswith('SC'): + from .waveshare.SCservo_sdk import PortHandler, PacketHandler + self._protocol = 'SC' + self.max_raw = 1023 + self.min_raw = 0 + self.max_deg = 300 + self.min_deg = 0 + def _create(): + ph = PortHandler(port) + if not ph.openPort(): + raise RuntimeError(f"Failed to open port {port} for servo {servo_id}") + if not ph.setBaudRate(baudrate): + raise RuntimeError(f"Failed to set baudrate {baudrate} for servo {servo_id}") + return ph, PacketHandler(1) + else: + raise ValueError(f"Unknown model: {model}") + + self.portHandler, self.packetHandler = _WaveshareConnectionManager.acquire( + port, baudrate, self._protocol, _create + ) + + def move_to(self, value, unit='degrees'): + if unit == 'degrees': + raw = degrees_to_raw(value, self.min_deg, self.max_deg, self.min_raw, self.max_raw) + elif unit == 'radians': + raw = radians_to_raw(value, math.radians(self.min_deg), math.radians(self.max_deg), self.min_raw, self.max_raw) + elif unit == 'raw': + raw = int(value) + else: + raise ValueError(f"Unknown unit: {unit}") + self.move_to_raw(raw) + + def get_position(self, unit='degrees'): + raw = self.get_position_raw() + if unit == 'degrees': + return raw_to_degrees(raw, self.min_deg, self.max_deg, self.min_raw, self.max_raw) + elif unit == 'radians': + return raw_to_radians(raw, math.radians(self.min_deg), math.radians(self.max_deg), self.min_raw, self.max_raw) + elif unit == 'raw': + return raw + else: + raise ValueError(f"Unknown unit: {unit}") + + def set_speed(self, value, unit='degrees'): + self.speed = int(value) + + def set_acceleration(self, value): + self.acceleration = int(value) + + def detach(self): + if self.model.startswith('ST'): + self.packetHandler.write1ByteTxRx(self.servo_id, ADDR_TORQUE_ENABLE, 0) + else: + self.packetHandler.write1ByteTxRx(self.portHandler, self.servo_id, ADDR_TORQUE_ENABLE, 0) + + def attach(self): + if self.model.startswith('ST'): + self.packetHandler.write1ByteTxRx(self.servo_id, ADDR_TORQUE_ENABLE, 1) + else: + self.packetHandler.write1ByteTxRx(self.portHandler, self.servo_id, ADDR_TORQUE_ENABLE, 1) + + def exit(self): + _WaveshareConnectionManager.release(self.port, self.baudrate, self._protocol) + + def move_to_raw(self, raw_value): + if self.model.startswith('ST'): + sts_comm_result, sts_error = self.packetHandler.WritePosEx(self.servo_id, raw_value, self.speed, self.acceleration) + self.handle_errors(sts_comm_result, sts_error) + else: + self.packetHandler.write1ByteTxRx(self.portHandler, self.servo_id, ADDR_SCS_GOAL_ACC, self.acceleration) + self.packetHandler.write2ByteTxRx(self.portHandler, self.servo_id, ADDR_SCS_GOAL_SPEED, self.speed) + scs_comm_result, scs_error = self.packetHandler.write2ByteTxRx(self.portHandler, self.servo_id, ADDR_SCS_GOAL_POSITION, raw_value) + self.handle_errors(scs_comm_result, scs_error) + + def get_position_raw(self): + if self.model.startswith('ST'): + sts_present_position, sts_comm_result, sts_error = self.packetHandler.ReadPos(self.servo_id) + if not self.handle_errors(sts_comm_result, sts_error): + return sts_present_position + else: + scs_present_position, scs_comm_result, scs_error = self.packetHandler.read2ByteTxRx(self.portHandler, self.servo_id, ADDR_SCS_PRESENT_POSITION) + if not self.handle_errors(scs_comm_result, scs_error): + return scs_present_position + return None + + def get_speed(self, unit='degrees'): + """ + Get the current speed of the servo. + """ + if self.model.startswith('ST'): + # Read STServo present position + sts_present_position, sts_present_speed, sts_comm_result, sts_error = self.packetHandler.ReadPosSpeed(self.servo_id) + if not self.handle_errors(sts_comm_result, sts_error): + return sts_present_speed + else: + scs_data, scs_comm_result, scs_error = self.packetHandler.read4ByteTxRx(self.portHandler, self.servo_id, ADDR_SCS_PRESENT_POSITION) + if not self.handle_errors(scs_comm_result, scs_error): + from .waveshare.SCservo_sdk.scservo_def import SCS_HIWORD + return SCS_HIWORD(scs_data) + return None + + def get_moving(self): + """ + Get the current moving status of the servo. + """ + if self.model.startswith('ST'): + # Read STServo moving status + moving, sts_comm_result, sts_error = self.packetHandler.ReadMoving(self.servo_id) + if not self.handle_errors(sts_comm_result, sts_error): + return moving + else: + # SCServo does not have a direct moving status, so we can infer it by checking if speed is non-zero or if position is changing over time. + speed = self.get_speed() + return speed != 0 + + + def enable_continuous(self): + """ + Set the servo mode. + :param mode: Mode to set (e.g., 'wheel', 'position') + """ + if self.model.startswith('SC'): + raise ValueError("Continuous mode is not supported for SCServo models.") + sts_comm_result, sts_error = self.packetHandler.WheelMode(self.servo_id) + if not self.handle_errors(sts_comm_result, sts_error): + self.log(f"Servo {self.servo_id} set to wheel mode") + + def turn_wheel(self, speed): + if self.model.startswith('SC'): + raise ValueError("Continuous mode is not supported for SCServo models.") + sts_comm_result, sts_error = self.packetHandler.WriteSpec(self.servo_id, speed, self.acceleration) + if not self.handle_errors(sts_comm_result, sts_error): + self.log(f"Servo {self.servo_id} turned at speed {speed}") + + + def handle_errors(self, comm_result, error): + """ + Handle communication errors. + :param comm_result: Communication result + :param error: Error code + """ + if comm_result != COMM_SUCCESS: + self.log("%s" % self.packetHandler.getTxRxResult(comm_result), level='error') + # log stack trace for debugging + return True + if error != 0: + self.log("%s" % self.packetHandler.getRxPacketError(error), level='error') + return True + return False + + def calibrate_to_center(self): + """ + Move the servo to the center of its range. + """ + if not self.range: + return + center_deg = (self.range[0] + self.range[1]) / 2 + center_raw = degrees_to_raw(center_deg, self.min_deg, self.max_deg, self.min_raw, self.max_raw) + self.move_to_raw(center_raw) + self.log(f"Moved servo {self.servo_id} to center position {center_deg} degrees (raw: {center_raw})") diff --git a/src/modules/actuators/bus_servo/tests/test_bus_servo.py b/src/modules/actuators/bus_servo/tests/test_bus_servo.py index 9e90d3dc..bb291fd2 100644 --- a/src/modules/actuators/bus_servo/tests/test_bus_servo.py +++ b/src/modules/actuators/bus_servo/tests/test_bus_servo.py @@ -1,28 +1,262 @@ import sys import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock +from modules.actuators.bus_servo.bus_servo import Servo +from modules.actuators.bus_servo.libraries.simulation_backend import SimulationBusServo +from modules.actuators.bus_servo.libraries.factory import BusServoFactory +from modules.actuators.bus_servo.libraries.waveshare_backend import WaveshareBusServo, _WaveshareConnectionManager -# Mock the SDK modules -mock_st_sdk = MagicMock() -mock_sc_sdk = MagicMock() -sys.modules['modules.actuators.bus_servo.STservo_sdk'] = mock_st_sdk -sys.modules['modules.actuators.bus_servo.SCservo_sdk'] = mock_sc_sdk -from modules.actuators.bus_servo.bus_servo import Servo +class TestWaveshareBusServo(unittest.TestCase): + def setUp(self): + # Mock the dynamic import of STservo_sdk + sys.modules['numpy'] = MagicMock() + self.mock_packet = MagicMock() + mock_sts = MagicMock(return_value=self.mock_packet) + self.mock_port_handler_instance = MagicMock() + mock_PortHandler = MagicMock(return_value=self.mock_port_handler_instance) + sys.modules['modules.actuators.bus_servo.libraries.waveshare.STservo_sdk'] = MagicMock( + sts=mock_sts, + PortHandler=mock_PortHandler + ) + self.servo = WaveshareBusServo(1, 'ST3215', '/dev/ttyUSB0', range=[0, 180]) + self.servo.packetHandler = self.mock_packet + self.servo.speed = 10 + self.servo.acceleration = 5 + self.servo.range = [0, 180] + self.servo.log = MagicMock() + # Set up default return values for SDK methods that return (comm_result, error) tuples + self.mock_packet.WritePosEx.return_value = (0, 0) + self.mock_packet.WheelMode.return_value = (0, 0) + self.mock_packet.WriteSpec.return_value = (0, 0) -class TestBusServo(unittest.TestCase): + def tearDown(self): + sys.modules.pop('modules.actuators.bus_servo.libraries.waveshare.STservo_sdk', None) + sys.modules.pop('numpy', None) + # Clear the connection manager so each test starts with a clean slate + _WaveshareConnectionManager.reset() + + def test_move_to(self): + self.servo.move_to(90, unit='degrees') + self.servo.move_to(1.57, unit='radians') + self.servo.move_to(2048, unit='raw') + # Should call move_to_raw + # No exception means pass + + def test_get_position(self): + self.servo.get_position_raw = MagicMock(return_value=2048) + pos_deg = self.servo.get_position(unit='degrees') + pos_rad = self.servo.get_position(unit='radians') + pos_raw = self.servo.get_position(unit='raw') + self.assertIsInstance(pos_deg, float) + self.assertIsInstance(pos_rad, float) + self.assertEqual(pos_raw, 2048) + + def test_set_speed(self): + self.servo.set_speed(20, unit='degrees') + + def test_attach_detach(self): + self.servo.attach() + self.servo.detach() + + def test_exit(self): + self.servo.exit() + + def test_move_to_raw(self): + self.servo.move_to_raw(1234) + + def test_get_speed(self): + self.mock_packet.ReadPosSpeed.return_value = (0, 10, 0, 0) + self.servo.model = 'ST3215' + self.servo.handle_errors = MagicMock(return_value=False) + speed = self.servo.get_speed() + self.assertEqual(speed, 10) + + def test_get_moving(self): + self.mock_packet.ReadMoving.return_value = (1, 0, 0) + self.servo.model = 'ST3215' + self.servo.handle_errors = MagicMock(return_value=False) + moving = self.servo.get_moving() + self.assertEqual(moving, 1) + + def test_enable_continuous(self): + self.mock_packet.WheelMode.return_value = (0, 0) + self.servo.model = 'ST3215' + self.servo.handle_errors = MagicMock(return_value=False) + self.servo.enable_continuous() + + def test_turn_wheel(self): + self.mock_packet.WriteSpec.return_value = (0, 0) + self.servo.model = 'ST3215' + self.servo.handle_errors = MagicMock(return_value=False) + self.servo.turn_wheel(50) + + def test_handle_errors(self): + self.servo.packetHandler.getTxRxResult = MagicMock(return_value='err') + self.servo.packetHandler.getRxPacketError = MagicMock(return_value='err') + self.servo.log = MagicMock() + import sys + sys.modules['modules.actuators.bus_servo.libraries.waveshare_backend'].COMM_SUCCESS = 0 + self.assertTrue(self.servo.handle_errors(1, 0)) + self.assertTrue(self.servo.handle_errors(0, 1)) + self.assertFalse(self.servo.handle_errors(0, 0)) + + def test_calibrate_to_center(self): + self.servo.handle_errors = MagicMock(return_value=False) + self.servo.range = [0, 180] + self.servo.model = 'ST3215' + self.servo.calibrate_to_center() + + def test_exit_closes_port_when_last_reference_released(self): + # Port should be closed only after all servo instances release the connection. + self.servo.exit() + self.mock_port_handler_instance.closePort.assert_called_once() + + def test_shared_connection_port_not_closed_until_all_released(self): + # Create a second servo on the same port/baudrate; they should share one connection. + servo2 = WaveshareBusServo(2, 'ST3215', '/dev/ttyUSB0', range=[0, 180]) + key = ('/dev/ttyUSB0', 1000000, 'ST') + self.assertEqual(_WaveshareConnectionManager._connections[key]['ref_count'], 2) + + # Release first servo – port must still be open. + self.servo.exit() + self.mock_port_handler_instance.closePort.assert_not_called() + self.assertEqual(_WaveshareConnectionManager._connections[key]['ref_count'], 1) - def test_init(self): - servo = Servo(name='test', id=1, range=[0, 4095], model='ST3215') + # Release second servo – port must now be closed. + servo2.exit() + self.mock_port_handler_instance.closePort.assert_called_once() + self.assertNotIn(key, _WaveshareConnectionManager._connections) + + def test_shared_connection_reuses_handlers(self): + # Two servos on the same bus must share the exact same portHandler and packetHandler. + servo2 = WaveshareBusServo(2, 'ST3215', '/dev/ttyUSB0', range=[0, 180]) + self.assertIs(self.servo.portHandler, servo2.portHandler) + self.assertIs(self.servo.packetHandler, servo2.packetHandler) + +class TestRustypotBusServo(unittest.TestCase): + def setUp(self): + # Mock numpy before importing backend, since it may not be installed in test env. + sys.modules['numpy'] = MagicMock( + deg2rad=lambda x: x, + rad2deg=lambda x: x, + ) + # Mock the dynamic import of rustypot + self.mock_controller = MagicMock() + mock_rustypot = MagicMock() + mock_rustypot.Sts3215PyController.return_value = self.mock_controller + mock_rustypot.Scs0009PyController.return_value = self.mock_controller + sys.modules['rustypot'] = mock_rustypot + from modules.actuators.bus_servo.libraries.rustypot_backend import RustypotBusServo + self.servo = RustypotBusServo(1, 'ST3215', '/dev/ttyUSB0', range=[0, 180]) + self.servo.controller = self.mock_controller + def tearDown(self): + sys.modules.pop('rustypot', None) + sys.modules.pop('numpy', None) + + def test_move_to(self): + self.servo.move_to(90, unit='degrees') + self.servo.move_to(1.57, unit='radians') + self.servo.move_to(1.0, unit='raw') + self.mock_controller.write_goal_position.assert_called() + + def test_get_position(self): + self.mock_controller.read_present_position.return_value = 1.57 + pos_deg = self.servo.get_position(unit='degrees') + pos_rad = self.servo.get_position(unit='radians') + pos_raw = self.servo.get_position(unit='raw') + self.assertIsInstance(pos_deg, float) + self.assertEqual(pos_rad, 1.57) + self.assertEqual(pos_raw, 1.57) + + def test_set_speed(self): + self.servo.set_speed(20, unit='degrees') + + def test_attach_detach(self): + self.servo.attach() + self.servo.detach() + self.mock_controller.write_torque_enable.assert_called() + + def test_exit(self): + self.servo.exit() + self.mock_controller.write_torque_enable.assert_called_with(1, False) + + def test_move_to_raw(self): + self.servo.move_to_raw(1.23) + self.mock_controller.write_goal_position.assert_called_with(1, 1.23) + + def test_get_position_raw(self): + self.mock_controller.read_present_position.return_value = 1.23 + self.assertEqual(self.servo.get_position_raw(), 1.23) + + def test_get_speed(self): + self.mock_controller.read_present_speed.return_value = 1.0 + speed = self.servo.get_speed(unit='degrees') + self.assertIsInstance(speed, float) + + def test_get_moving(self): + self.mock_controller.read_present_speed.return_value = 0.0 + self.assertFalse(self.servo.get_moving()) + self.mock_controller.read_present_speed.return_value = 1.0 + self.assertTrue(self.servo.get_moving()) + + def test_enable_continuous(self): + with self.assertRaises(NotImplementedError): + self.servo.enable_continuous() + + def test_turn_wheel(self): + with self.assertRaises(NotImplementedError): + self.servo.turn_wheel(10) + + def test_handle_errors(self): + self.assertFalse(self.servo.handle_errors(0, 0)) + + def test_calibrate_to_center(self): + self.servo.move_to = MagicMock() + self.servo.range = [0, 180] + self.servo.calibrate_to_center() + self.servo.move_to.assert_called() +class TestBusServo(unittest.TestCase): + def test_init_simulation_backend(self): + servo = Servo(name='test', id=1, range=[0, 180], model='ST3215', backend='simulation') self.assertEqual(servo.identifier, 'test') self.assertEqual(servo.index, 1) - self.assertEqual(servo.range, [0, 4095]) + self.assertEqual(servo.range, [0, 180]) self.assertEqual(servo.model, 'ST3215') + # Should use simulation backend + self.assertIsInstance(servo.backend_servo, SimulationBusServo) + + def test_move_to_degrees(self): + servo = Servo(name='test', id=1, range=[0, 180], model='ST3215', backend='simulation') + # Should log the move and update position + servo.move(90, unit='degrees') + self.assertEqual(servo.backend_servo.position, 90) + + def test_factory_simulation(self): + sim = BusServoFactory.create( + backend='simulation', + model='ST3215', + servo_id=1, + port='sim', + range=[0, 180] + ) + self.assertIsInstance(sim, SimulationBusServo) + sim.move_to(45, unit='degrees') + self.assertEqual(sim.position, 45) - def test_init_defaults(self): - servo = Servo(name='test', id=1, range=[0, 1023]) - self.assertEqual(servo.baudrate, 1000000) - self.assertEqual(servo.port, '/dev/ttyAMA0') + def test_simulation_methods(self): + sim = SimulationBusServo(1, 'ST3215', 'sim', range=[0, 180]) + sim.set_speed(10) + self.assertEqual(sim.speed, 10) + sim.detach() + self.assertFalse(sim.torque_enabled) + sim.attach() + self.assertTrue(sim.torque_enabled) + sim.move_to_raw(100) + self.assertEqual(sim.position, 100) + self.assertFalse(sim.get_moving()) + sim.calibrate_to_center() + self.assertEqual(sim.position, 90) if __name__ == '__main__': unittest.main() diff --git a/src/modules/actuators/piservo/tests/test_piservo.py b/src/modules/actuators/piservo/tests/test_piservo.py index f4baba4b..01224ee9 100644 --- a/src/modules/actuators/piservo/tests/test_piservo.py +++ b/src/modules/actuators/piservo/tests/test_piservo.py @@ -1,4 +1,6 @@ import sys +from unittest.mock import MagicMock +sys.modules['yaml'] = MagicMock() import unittest from unittest.mock import MagicMock, patch diff --git a/src/modules/actuators/servo/tests/test_servo.py b/src/modules/actuators/servo/tests/test_servo.py index 5d7f1520..314815da 100644 --- a/src/modules/actuators/servo/tests/test_servo.py +++ b/src/modules/actuators/servo/tests/test_servo.py @@ -1,4 +1,6 @@ import sys +from unittest.mock import MagicMock +sys.modules['yaml'] = MagicMock() import unittest from unittest.mock import MagicMock, patch diff --git a/src/modules/actuators/tests/bus_servo_test.py b/src/modules/actuators/tests/bus_servo_test.py index e6e723ef..80f55376 100644 --- a/src/modules/actuators/tests/bus_servo_test.py +++ b/src/modules/actuators/tests/bus_servo_test.py @@ -11,17 +11,15 @@ def _make_servo(model='ST', speed=300, acceleration=50, range=(0, 4095)): The messaging service is wired up so subscribe/publish work via simple dict-based stubs (avoiding the real pub/sub library). """ - # Patch hardware imports so no serial port is needed - with patch.dict('sys.modules', { - 'modules.actuators.bus_servo.STservo_sdk': MagicMock(), - 'modules.actuators.bus_servo.SCservo_sdk': MagicMock(), - }): - from modules.actuators.bus_servo import servo as servo_mod + from modules.actuators.bus_servo import bus_servo as servo_mod - # Patch COMM_SUCCESS constant used by handle_errors - servo_mod.COMM_SUCCESS = 0 + # Create a mock backend servo (replaces WaveshareBusServo, etc.) + mock_backend = MagicMock() + mock_backend.get_moving.return_value = 0 + mock_backend.get_position.return_value = 100 - # Build a minimal kwargs dict + # Patch BusServoFactory.create so no hardware SDK is imported + with patch.object(servo_mod.BusServoFactory, 'create', return_value=mock_backend): kwargs = { 'name': 'test_servo', 'model': model, @@ -37,41 +35,21 @@ def _make_servo(model='ST', speed=300, acceleration=50, range=(0, 4095)): 'start': None, 'poses': [], } - - # Patch PortHandler so openPort / setBaudRate succeed - port_handler = MagicMock() - port_handler.openPort.return_value = True - port_handler.setBaudRate.return_value = True - servo_mod.PortHandler = MagicMock(return_value=port_handler) - - # Patch packet handler constructors - packet_handler = MagicMock() - servo_mod.sts = MagicMock(return_value=packet_handler) - servo_mod.PacketHandler = MagicMock(return_value=packet_handler) - - # WritePosEx returns (comm_result, error) – both 0 = success - packet_handler.WritePosEx.return_value = (0, 0) - packet_handler.ReadMoving.return_value = (0, 0, 0) - packet_handler.ReadPosSpeed.return_value = (100, 0, 0, 0) - sv = servo_mod.Servo(**kwargs) - sv.pos = 100 # Set a known current position - # Wire up a mock messaging service - messaging_service = MagicMock() - sv._messaging_service = messaging_service - # Call setup_messaging manually (skipping hardware init side-effects) - # We do NOT set sv.messaging_service via property to avoid re-running setup + sv.pos = 100 # Set a known current position - return sv, packet_handler + # Wire up a mock messaging service + messaging_service = MagicMock() + sv._messaging_service = messaging_service + + return sv, mock_backend class TestBusServoQueue(unittest.TestCase): def setUp(self): self.sv, self.ph = _make_servo() - # Make handle_errors always return False (no error) - self.sv.handle_errors = MagicMock(return_value=False) # ------------------------------------------------------------------ # move (queues the request) @@ -166,30 +144,22 @@ def test_process_queue_uses_per_item_speed_acceleration(self): self.sv._do_move.assert_called_once_with(400, 50, 5) # ------------------------------------------------------------------ - # _do_move – no blocking sleep, speed/acceleration params + # _do_move – no blocking sleep, delegates to backend # ------------------------------------------------------------------ def test_move_does_not_block(self): """_do_move() must not call time.sleep (blocking removed).""" - self.sv.is_moving = MagicMock(return_value=True) with patch('time.sleep') as mock_sleep: - # Even when is_moving returns True, _do_move() should not sleep self.sv._do_move(500) mock_sleep.assert_not_called() def test_move_accepts_speed_acceleration_params(self): - """_do_move() should use supplied speed/acceleration, not instance defaults.""" - self.sv.is_moving = MagicMock(return_value=False) + """_do_move() should delegate the move to the backend servo.""" self.sv._do_move(500, speed=10, acceleration=2) - self.ph.WritePosEx.assert_called_once_with( - self.sv.index, 500, 10, 2 - ) + self.ph.move_to.assert_called_once_with(500, unit='degrees') def test_move_uses_instance_defaults_when_not_supplied(self): - self.sv.is_moving = MagicMock(return_value=False) self.sv._do_move(500) - self.ph.WritePosEx.assert_called_once_with( - self.sv.index, 500, self.sv.speed, self.sv.acceleration - ) + self.ph.move_to.assert_called_once_with(500, unit='degrees') # ------------------------------------------------------------------ # setup_messaging subscriptions diff --git a/src/modules/actuators/tests/servo_test.py b/src/modules/actuators/tests/servo_test.py index 610af3c9..91f1985a 100644 --- a/src/modules/actuators/tests/servo_test.py +++ b/src/modules/actuators/tests/servo_test.py @@ -1,28 +1,39 @@ -from unittest import TestCase, mock -from modules.mocks import mock_pigpio -from modules.mocks import mock_arduino_serial -from modules.actuators.servo import Servo -import pytest +import sys +import unittest +from unittest import mock +from unittest.mock import MagicMock, patch -@mock.patch('modules.actuators.servo.pub', return_value=mock.Mock()) -class ServoTest(TestCase): +# Mock hardware dependencies before importing Servo +sys.modules['yaml'] = MagicMock() +sys.modules['pigpio'] = MagicMock() +sys.modules['modules.network.arduinoserial'] = MagicMock() +sys.modules['modules.network.arduinoserial.arduinoserial'] = MagicMock() - def test_init(self, mock_pub): +from modules.actuators.servo.servo import Servo + + +class ServoTest(unittest.TestCase): + + def _make_servo(self, **kwargs): + sv = Servo(**kwargs) + sv._messaging_service = MagicMock() + return sv + + def test_init(self): # Pin 1, default values - sv = Servo(1, 'test', (20000, 40000)) + sv = Servo(pin=1, name='test', range=(20000, 40000), serial=True) assert sv.pos == 30000 assert sv.range == (20000, 40000) assert sv.pin == 1 # Override defaults - sv = Servo(10, 'test', (10, 40), start_pos=50) + sv = Servo(pin=10, name='test', range=(10, 40), start=50, serial=True) assert sv.pos == 25 assert sv.range == (10, 40) assert sv.pin == 10 - - def test_move(self, mock_pub): - sv = Servo(1, 'test', (0, 200), start_pos=50) + def test_move(self): + sv = self._make_servo(pin=1, name='test', range=(0, 200), start=50, serial=None) # test absolute values sv.move(10) assert sv.pos == 20 @@ -41,47 +52,45 @@ def test_move(self, mock_pub): assert sv.pos == 100 # test out of range values - with pytest.raises(ValueError) as ex: + with self.assertRaises(ValueError) as cm: sv.move(-10, False) - assert "out of range" in str(ex.value) - with pytest.raises(ValueError) as ex: + assert "out of range" in str(cm.exception) + with self.assertRaises(ValueError) as cm: sv.move(101, False) - assert "out of range" in str(ex.value) + assert "out of range" in str(cm.exception) sv.move(-10) assert sv.pos == 0 sv.move(181) assert sv.pos == 200 - - def test_move_relative(self, mock_pub): - sv = Servo(1, 'test', (0, 200), start_pos=50) - # test absolute values + def test_move_relative(self): + sv = self._make_servo(pin=1, name='test', range=(0, 200), start=50, serial=None) + # test relative values sv.move_relative(10) assert sv.pos == 120 sv.move_relative(-20) assert sv.pos == 80 # test out of range values - with pytest.raises(ValueError) as ex: + with self.assertRaises(ValueError) as cm: sv.move_relative(-50, False) - assert "out of range" in str(ex.value) - with pytest.raises(ValueError) as ex: + assert "out of range" in str(cm.exception) + with self.assertRaises(ValueError) as cm: sv.move_relative(101, False) - assert "out of range" in str(ex.value) + assert "out of range" in str(cm.exception) sv.move_relative(-50) assert sv.pos == 0 sv.move_relative(101) assert sv.pos == 200 - - def test_buffer(self, mock_pub): - sv = Servo(1, 'test', (0, 2000), start_pos=50) + def test_buffer(self): + sv = self._make_servo(pin=1, name='test', range=(0, 2000), start=50, serial=None) sv.move(100) assert sv.pos == 2000 - sv2 = Servo(1, 'test', (0, 2000), start_pos=50, buffer=100) + sv2 = self._make_servo(pin=1, name='test', range=(0, 2000), start=50, buffer=100, serial=None) sv2.move(100) assert sv2.pos == 2000 @@ -92,8 +101,10 @@ def test_buffer(self, mock_pub): assert len(sequence) == 1 sequence = sv2.calculate_move(100, 200) - # [(100, 0.1), (101.5, 0.1), (103.75, 0.1), (107.125, 0.1), (112.1875, 0.1), (119.78125, 0.1), (131.171875, 0.1), (148.2578125, 0.1), (173.88671875, 0.1), (200, 0.1)] assert len(sequence) == 10 assert sequence[0][0] == 100 assert sequence[9][0] == 200 + +if __name__ == '__main__': + unittest.main() diff --git a/src/modules/network/rtlsdr/tests/test_rtlsdr.py b/src/modules/network/rtlsdr/tests/test_rtlsdr.py index 99103aeb..d51ef24e 100644 --- a/src/modules/network/rtlsdr/tests/test_rtlsdr.py +++ b/src/modules/network/rtlsdr/tests/test_rtlsdr.py @@ -1,3 +1,6 @@ +import sys +from unittest.mock import MagicMock +sys.modules['requests'] = MagicMock() import unittest from unittest.mock import MagicMock, patch from modules.network.rtlsdr.rtlsdr import RTLSDR diff --git a/src/modules/personality/personality.py b/src/modules/personality/personality.py index 3ca15cdb..eb19afe6 100644 --- a/src/modules/personality/personality.py +++ b/src/modules/personality/personality.py @@ -125,9 +125,9 @@ def chicken_head(self): print(f"Pitch: {pitch}, Angle to zero yaw: {zero_yaw}") if abs(pitch) > 5: - self.servos['neck_tilt'].move_degrees(pitch) + self.servos['neck_tilt'].move_relative(pitch) if abs(zero_yaw) > 5: - self.servos['neck_pan'].move_degrees(zero_yaw) + self.servos['neck_pan'].move_relative(zero_yaw) def one_leg_balance(self): """ Use body IMU data to move legs into a one legged stance by lifting one leg and adjusting the other leg and body to maintain balance. """ @@ -146,18 +146,18 @@ def one_leg_balance(self): print(f"Current body roll: {roll}") # This should just show one leg extend under the body, and the other knee bending. For demo only if roll > 0: - # self.servos['leg_l_knee'].move_degrees(-90) - self.servos['leg_l_tilt'].move_degrees(roll) + # self.servos['leg_l_knee'].move_relative(-90) + self.servos['leg_l_tilt'].move_relative(roll) self.servos['leg_r_tilt'].move(self.servos['leg_r_tilt'].start) - # self.servos['leg_r_knee'].move_degrees(90) + # self.servos['leg_r_knee'].move_relative(90) pass else: - self.servos['leg_r_tilt'].move_degrees(roll) + self.servos['leg_r_tilt'].move_relative(roll) # self.servos['leg_l_tilt'].calibrate_to_center() self.servos['leg_l_tilt'].move(self.servos['leg_l_tilt'].start) - # self.servos['leg_r_knee'].move_degrees(-90) - # self.servos['leg_r_tilt'].move_degrees(-pitch) - # self.servos['leg_l_knee'].move_degrees(90) + # self.servos['leg_r_knee'].move_relative(-90) + # self.servos['leg_r_tilt'].move_relative(-pitch) + # self.servos['leg_l_knee'].move_relative(90) def balance(self): """Use head and body IMU data to maintain balance by adjusting leg servos.""" @@ -171,8 +171,8 @@ def balance(self): if abs(pitch) < 2: return # No need to adjust for small angles # print(f"Angle to move: {pitch}") - self.servos['leg_l_hip'].move_degrees(-pitch) - self.servos['leg_r_hip'].move_degrees(pitch) + self.servos['leg_l_hip'].move_relative(-pitch) + self.servos['leg_r_hip'].move_relative(pitch) def handle_user_message(self, user_id=None, message=None): print(f"Received message from user {user_id}: {message}") @@ -217,9 +217,9 @@ def cycle_display(self): def loop_10(self): # loop 3 times: # for i in range(3): - # self.servos['leg_l_ankle'].move(1000, speed=0) - # self.servos['leg_l_ankle'].move(1500, speed=0) - # self.servos['leg_l_ankle'].move_degrees(50) + # self.servos['leg_l_ankle'].move_relative(1000, speed=0) + # self.servos['leg_l_ankle'].move_relative(1500, speed=0) + # self.servos['leg_l_ankle'].move_relative(50) # self.scan_vision() # self.output_current_pose() # self.publish('servo/pose', pose_name='wave_1') # For testing pose movement @@ -454,7 +454,7 @@ def track_match(self, match): pan_angle = int(((center_pos_x - (camera_size[0] / 2)) / (camera_size[0] / 2)) * 40) # Scale to -40 to 40 degrees if abs(pan_angle) > track_threshold[0]: # Only move if the angle is greater than 5 degrees to avoid jitter self.log(f"Moving neck pan to {pan_angle} degrees based on detected object position") - self.servos['neck_pan'].move_degrees(pan_angle) + self.servos['neck_pan'].move_relative(pan_angle) # Move tilt servo based on center_pos_y position relative to camera_size # Focus on the top of the bounding box in the Y axis @@ -462,7 +462,7 @@ def track_match(self, match): tilt_angle = int(((top_section_y - (camera_size[1] / 2)) / (camera_size[1] / 2)) * 40) # Scale to -40 to 40 degrees if abs(tilt_angle) > track_threshold[1]: # Only move if the angle is greater than 5 degrees to avoid jitter self.log(f"Moving neck tilt to {tilt_angle} degrees based on detected object position") - self.servos['neck_tilt'].move_degrees(-tilt_angle) + self.servos['neck_tilt'].move_relative(-tilt_angle) # Motion: Updates the last motion time def update_motion_time(self, value): diff --git a/src/modules/personality/tests/test_personality.py b/src/modules/personality/tests/test_personality.py index 4d0acd61..a5696ea9 100644 --- a/src/modules/personality/tests/test_personality.py +++ b/src/modules/personality/tests/test_personality.py @@ -1,3 +1,6 @@ +import sys +from unittest.mock import MagicMock +sys.modules['yaml'] = MagicMock() import unittest from unittest.mock import MagicMock, patch from modules.personality.personality import Personality diff --git a/src/tests/test_base_module.py b/src/tests/test_base_module.py index 5865baf1..8ab55a5a 100644 --- a/src/tests/test_base_module.py +++ b/src/tests/test_base_module.py @@ -1,3 +1,7 @@ +import sys +from unittest.mock import MagicMock +sys.modules['yaml'] = MagicMock() +sys.modules['requests'] = MagicMock() import unittest from unittest import mock from modules.base_module import BaseModule diff --git a/src/tests/test_module_loader.py b/src/tests/test_module_loader.py index 0cb1f60f..cb108803 100644 --- a/src/tests/test_module_loader.py +++ b/src/tests/test_module_loader.py @@ -1,3 +1,7 @@ +import sys +from unittest.mock import MagicMock +sys.modules['yaml'] = MagicMock() +sys.modules['requests'] = MagicMock() import unittest from unittest.mock import Mock, patch, MagicMock diff --git a/test.sh b/test.sh index b88a7e89..c0262b48 100755 --- a/test.sh +++ b/test.sh @@ -1,2 +1,4 @@ -PYTHONPATH=src python3 -m unittest discover -s src/tests -p "test_*.py" -t src -PYTHONPATH=src python3 -m unittest discover -s src/modules -p "test_*.py" -t src +PYTHONPATH=src python3 -m unittest discover -s src/tests -p "test_*.py" -t src -b +PYTHONPATH=src python3 -m unittest discover -s src/tests -p "*_test.py" -t src -b +PYTHONPATH=src python3 -m unittest discover -s src/modules -p "test_*.py" -t src -b +PYTHONPATH=src python3 -m unittest discover -s src/modules -p "*_test.py" -t src -b