From 5ceb99c80972ff3b92b83e61c5679bc0d081315d Mon Sep 17 00:00:00 2001 From: Shikhar Mishra Date: Mon, 20 Jan 2025 01:50:36 +0530 Subject: [PATCH 1/4] refactor: replaced Any types with TypedDict definitions for all service methods --- kos-py/pykos/services/actuator.py | 39 ++++++++++--- kos-py/pykos/services/imu.py | 73 ++++++++++++++++++++---- kos-py/pykos/services/inference.py | 31 ++++++++-- kos-py/pykos/services/led_matrix.py | 11 +++- kos-py/pykos/services/process_manager.py | 27 +++++++-- kos-py/pykos/services/sim.py | 37 ++++++++++-- kos-py/pykos/services/sound.py | 20 ++++++- kos-py/tests/test_pykos.py | 3 +- 8 files changed, 199 insertions(+), 42 deletions(-) diff --git a/kos-py/pykos/services/actuator.py b/kos-py/pykos/services/actuator.py index c079656..28b6d45 100644 --- a/kos-py/pykos/services/actuator.py +++ b/kos-py/pykos/services/actuator.py @@ -1,6 +1,6 @@ """Actuator service client.""" -from typing import NotRequired, TypedDict, Unpack +from typing import List, NotRequired, TypedDict, Unpack import grpc from google.longrunning import operations_pb2, operations_pb2_grpc @@ -10,6 +10,31 @@ from kos_protos.actuator_pb2 import CalibrateActuatorMetadata +class ActionResult(TypedDict): + actuator_id: int + success: bool + error: NotRequired[common_pb2.Error] + + +class CommandActuatorsResponse(TypedDict): + results: List[ActionResult] + + +class ActuatorStateResponse(TypedDict): + actuator_id: int + online: bool + position: NotRequired[float] + velocity: NotRequired[float] + torque: NotRequired[float] + temperature: NotRequired[float] + voltage: NotRequired[float] + current: NotRequired[float] + + +class GetActuatorsStateResponse(TypedDict): + states: List[ActuatorStateResponse] + + class ActuatorCommand(TypedDict): actuator_id: int position: NotRequired[float] @@ -82,7 +107,7 @@ def get_calibration_status(self, actuator_id: int) -> str | None: metadata = CalibrationMetadata(response.metadata) return metadata.status - def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.CommandActuatorsResponse: + def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuatorsResponse: """Command multiple actuators at once. Example: @@ -97,13 +122,13 @@ def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.Com 'velocity', and 'torque'. Returns: - List of ActionResult objects indicating success/failure for each command. + CommandActuatorsResponse is a list of ActionResult objects indicating success/failure for each command. """ actuator_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] request = actuator_pb2.CommandActuatorsRequest(commands=actuator_commands) return self.stub.CommandActuators(request) - def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> common_pb2.ActionResult: + def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> ActionResult: """Configure an actuator's parameters. Example: @@ -134,12 +159,12 @@ def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> comm protection_time, torque_enabled, new_actuator_id Returns: - ActionResponse indicating success/failure + ActionResult conatining the actuator_id and a ActionResponse indicating success/failure """ request = actuator_pb2.ConfigureActuatorRequest(**kwargs) return self.stub.ConfigureActuator(request) - def get_actuators_state(self, actuator_ids: list[int] | None = None) -> actuator_pb2.GetActuatorsStateResponse: + def get_actuators_state(self, actuator_ids: list[int] | None = None) -> GetActuatorsStateResponse: """Get the state of multiple actuators. Example: @@ -149,7 +174,7 @@ def get_actuators_state(self, actuator_ids: list[int] | None = None) -> actuator actuator_ids: List of actuator IDs to query. If None, gets state of all actuators. Returns: - List of ActuatorStateResponse objects containing the state information + GetActuatorsStateResponse containing a list of ActuatorStateResponse objects with state information """ request = actuator_pb2.GetActuatorsStateRequest(actuator_ids=actuator_ids or []) return self.stub.GetActuatorsState(request) diff --git a/kos-py/pykos/services/imu.py b/kos-py/pykos/services/imu.py index 4eb85e6..cd39e7e 100644 --- a/kos-py/pykos/services/imu.py +++ b/kos-py/pykos/services/imu.py @@ -12,6 +12,53 @@ from kos_protos.imu_pb2 import CalibrateIMUMetadata +class IMUValuesResponse(TypedDict): + acceleration_x: float + acceleration_y: float + acceleration_z: float + gyroscope_x: float + gyroscope_y: float + gyroscope_z: float + magnetometer_x: float + magnetometer_y: float + magnetometer_z: float + + +class IMUAdvancedValuesResponse(TypedDict): + linear_acceleration_x: float + linear_acceleration_y: float + linear_acceleration_z: float + gravity_x: float + gravity_y: float + gravity_z: float + rotation_rate_x: float + rotation_rate_y: float + rotation_rate_z: float + + +class EulerAnglesResponse(TypedDict): + roll: float + pitch: float + yaw: float + + +class QuaternionResponse(TypedDict): + w: float + x: float + y: float + z: float + + +class ActionResponse(TypedDict): + success: bool + error: NotRequired[common_pb2.Error] + + +class CalibrateIMUResponse(TypedDict): + name: str + metadata: AnyPb2 + + class ZeroIMURequest(TypedDict): max_retries: NotRequired[int] max_angular_error: NotRequired[float] @@ -56,39 +103,41 @@ def __init__(self, channel: grpc.Channel) -> None: self.stub = imu_pb2_grpc.IMUServiceStub(channel) self.operations_stub = operations_pb2_grpc.OperationsStub(channel) - def get_imu_values(self) -> imu_pb2.IMUValuesResponse: + def get_imu_values(self) -> IMUValuesResponse: """Get the latest IMU sensor values. Returns: - ImuValuesResponse: The latest IMU sensor values. + IMUValuesResponse: The latest IMU sensor values including acceleration, + gyroscope, and magnetometer readings on x, y, z axes. """ return self.stub.GetValues(Empty()) - def get_imu_advanced_values(self) -> imu_pb2.IMUAdvancedValuesResponse: + def get_imu_advanced_values(self) -> IMUAdvancedValuesResponse: """Get the latest IMU advanced values. Returns: - ImuAdvancedValuesResponse: The latest IMU advanced values. + IMUAdvancedValuesResponse: The latest IMU advanced values including linear acceleration, + gravity, and rotation rate on x, y, z axes. """ return self.stub.GetAdvancedValues(Empty()) - def get_euler_angles(self) -> imu_pb2.EulerAnglesResponse: + def get_euler_angles(self) -> EulerAnglesResponse: """Get the latest Euler angles. Returns: - EulerAnglesResponse: The latest Euler angles. + EulerAnglesResponse: The latest Euler angles (roll, pitch, yaw). """ return self.stub.GetEuler(Empty()) - def get_quaternion(self) -> imu_pb2.QuaternionResponse: + def get_quaternion(self) -> QuaternionResponse: """Get the latest quaternion. Returns: - QuaternionResponse: The latest quaternion. + QuaternionResponse: The latest quaternion values (w, x, y, z). """ return self.stub.GetQuaternion(Empty()) - def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> common_pb2.ActionResponse: + def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> ActionResponse: """Zero the IMU. Example: @@ -108,18 +157,18 @@ def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> commo max_acceleration: Maximum acceleration during zeroing Returns: - ActionResponse: The response from the zero operation. + ActionResponse: Response indicating success/failure of the zero operation. """ request = imu_pb2.ZeroIMURequest(duration=_duration_from_seconds(duration), **kwargs) return self.stub.Zero(request) - def calibrate(self) -> imu_pb2.CalibrateIMUResponse: + def calibrate(self) -> CalibrateIMUResponse: """Calibrate the IMU. This starts a long-running calibration operation. The operation can be monitored using get_calibration_status(). Returns: - CalibrationMetadata: Metadata about the calibration operation. + CalibrateIMUResponse: Response containing operation name and metadata about the calibration. """ return self.stub.Calibrate(Empty()) diff --git a/kos-py/pykos/services/inference.py b/kos-py/pykos/services/inference.py index b36bb1f..79300e2 100644 --- a/kos-py/pykos/services/inference.py +++ b/kos-py/pykos/services/inference.py @@ -82,6 +82,27 @@ class GetModelsInfoResponse(TypedDict): error: NotRequired[common_pb2.Error | None] +class UploadModelResponse(TypedDict): + """Response from uploading a model.""" + + uid: str + error: NotRequired[common_pb2.Error | None] + + +class LoadModelsResponse(TypedDict): + """Response from loading models.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + class InferenceServiceClient: """Client for the InferenceService. @@ -96,9 +117,7 @@ def __init__(self, channel: grpc.Channel) -> None: """ self.stub = inference_pb2_grpc.InferenceServiceStub(channel) - def upload_model( - self, model_data: bytes, metadata: ModelMetadata | None = None - ) -> inference_pb2.UploadModelResponse: + def upload_model(self, model_data: bytes, metadata: ModelMetadata | None = None) -> UploadModelResponse: """Upload a model to the robot. Example: @@ -125,19 +144,19 @@ def upload_model( request = inference_pb2.UploadModelRequest(model=model_data, metadata=proto_metadata) return self.stub.UploadModel(request) - def load_models(self, uids: list[str]) -> inference_pb2.LoadModelsResponse: + def load_models(self, uids: list[str]) -> LoadModelsResponse: """Load models from the robot's filesystem. Args: uids: List of model UIDs to load. Returns: - LoadModelsResponse containing information about the loaded models. + LoadModelsResponse containing success status and any error information. """ request = inference_pb2.ModelUids(uids=uids) return self.stub.LoadModels(request) - def unload_models(self, uids: list[str]) -> common_pb2.ActionResponse: + def unload_models(self, uids: list[str]) -> ActionResponse: """Unload models from the robot's filesystem. Args: diff --git a/kos-py/pykos/services/led_matrix.py b/kos-py/pykos/services/led_matrix.py index a3c7bae..62ee976 100644 --- a/kos-py/pykos/services/led_matrix.py +++ b/kos-py/pykos/services/led_matrix.py @@ -46,6 +46,13 @@ class ImageData(TypedDict): brightness: int +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + class LEDMatrixServiceClient: """Client for the LEDMatrixService. @@ -74,7 +81,7 @@ def get_matrix_info(self) -> MatrixInfo: """ return self.stub.GetMatrixInfo(Empty()) - def write_buffer(self, buffer: bytes) -> common_pb2.ActionResponse: + def write_buffer(self, buffer: bytes) -> ActionResponse: """Write binary on/off states to the LED matrix. The buffer should be width * height / 8 bytes long, where each bit @@ -89,7 +96,7 @@ def write_buffer(self, buffer: bytes) -> common_pb2.ActionResponse: request = led_matrix_pb2.WriteBufferRequest(buffer=buffer) return self.stub.WriteBuffer(request) - def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> common_pb2.ActionResponse: + def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> ActionResponse: """Write image data to the LED matrix. Args: diff --git a/kos-py/pykos/services/process_manager.py b/kos-py/pykos/services/process_manager.py index 69f2896..a6f87d0 100644 --- a/kos-py/pykos/services/process_manager.py +++ b/kos-py/pykos/services/process_manager.py @@ -1,32 +1,49 @@ """Process manager service client.""" +from typing import NotRequired, TypedDict + import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import process_manager_pb2, process_manager_pb2_grpc +from kos_protos import common_pb2, process_manager_pb2_grpc from kos_protos.process_manager_pb2 import KClipStartRequest +class KClipStartResponse(TypedDict): + """Response from starting KClip recording.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + +class KClipStopResponse(TypedDict): + """Response from stopping KClip recording.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + clip_path: NotRequired[str | None] + + class ProcessManagerServiceClient: def __init__(self, channel: grpc.Channel) -> None: self.stub = process_manager_pb2_grpc.ProcessManagerServiceStub(channel) - def start_kclip(self, action: str) -> process_manager_pb2.KClipStartResponse: + def start_kclip(self, action: str) -> KClipStartResponse: """Start KClip recording. Args: action: The action string for the KClip request Returns: - The response from the server. + KClipStartResponse containing success status and any error information. """ request = KClipStartRequest(action=action) return self.stub.StartKClip(request) - def stop_kclip(self, request: Empty = Empty()) -> process_manager_pb2.KClipStopResponse: + def stop_kclip(self, request: Empty = Empty()) -> KClipStopResponse: """Stop KClip recording. Returns: - The response from the server. + KClipStopResponse containing success status, clip path, and any error information. """ return self.stub.StopKClip(request) diff --git a/kos-py/pykos/services/sim.py b/kos-py/pykos/services/sim.py index 1200529..6233a6d 100644 --- a/kos-py/pykos/services/sim.py +++ b/kos-py/pykos/services/sim.py @@ -9,30 +9,54 @@ class DefaultPosition(TypedDict): + """Default position state for simulation.""" + qpos: list[float] class ResetRequest(TypedDict): + """Request parameters for resetting simulation.""" + initial_state: NotRequired[DefaultPosition] randomize: NotRequired[bool] class StepRequest(TypedDict): + """Request parameters for stepping simulation.""" + num_steps: int step_size: NotRequired[float] class SimulationParameters(TypedDict): + """Parameters for configuring simulation.""" + time_scale: NotRequired[float] gravity: NotRequired[float] initial_state: NotRequired[DefaultPosition] +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + +class GetParametersResponse(TypedDict): + """Response containing current simulation parameters.""" + + time_scale: float + gravity: float + initial_state: DefaultPosition + error: NotRequired[common_pb2.Error | None] + + class SimServiceClient: def __init__(self, channel: grpc.Channel) -> None: self.stub = sim_pb2_grpc.SimulationServiceStub(channel) - def reset(self, **kwargs: Unpack[ResetRequest]) -> common_pb2.ActionResponse: + def reset(self, **kwargs: Unpack[ResetRequest]) -> ActionResponse: """Reset the simulation to its initial state. Args: @@ -57,7 +81,7 @@ def reset(self, **kwargs: Unpack[ResetRequest]) -> common_pb2.ActionResponse: request = sim_pb2.ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize")) return self.stub.Reset(request) - def set_paused(self, paused: bool) -> common_pb2.ActionResponse: + def set_paused(self, paused: bool) -> ActionResponse: """Pause or unpause the simulation. Args: @@ -69,7 +93,7 @@ def set_paused(self, paused: bool) -> common_pb2.ActionResponse: request = sim_pb2.SetPausedRequest(paused=paused) return self.stub.SetPaused(request) - def step(self, num_steps: int, step_size: float | None = None) -> common_pb2.ActionResponse: + def step(self, num_steps: int, step_size: float | None = None) -> ActionResponse: """Step the simulation forward. Args: @@ -82,7 +106,7 @@ def step(self, num_steps: int, step_size: float | None = None) -> common_pb2.Act request = sim_pb2.StepRequest(num_steps=num_steps, step_size=step_size) return self.stub.Step(request) - def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> common_pb2.ActionResponse: + def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> ActionResponse: """Set simulation parameters. Example: @@ -112,10 +136,11 @@ def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> common_pb2.A request = sim_pb2.SetParametersRequest(parameters=params) return self.stub.SetParameters(request) - def get_parameters(self) -> sim_pb2.GetParametersResponse: + def get_parameters(self) -> GetParametersResponse: """Get current simulation parameters. Returns: - GetParametersResponse containing current parameters and any error + GetParametersResponse containing current parameters (time_scale, gravity, initial_state) + and any error information. """ return self.stub.GetParameters(Empty()) diff --git a/kos-py/pykos/services/sound.py b/kos-py/pykos/services/sound.py index d020484..ee38dcf 100644 --- a/kos-py/pykos/services/sound.py +++ b/kos-py/pykos/services/sound.py @@ -52,6 +52,20 @@ class AudioConfig(TypedDict): channels: int +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + +class RecordAudioResponse(TypedDict): + """Response containing recorded audio data.""" + + audio_data: bytes + error: NotRequired[common_pb2.Error | None] + + class SoundServiceClient: """Client for the SoundService. @@ -74,7 +88,7 @@ def get_audio_info(self) -> AudioInfo: """ return self.stub.GetAudioInfo(Empty()) - def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConfig]) -> common_pb2.ActionResponse: + def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConfig]) -> ActionResponse: """Stream PCM audio data to the speaker. Args: @@ -93,7 +107,7 @@ def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConf ... def chunks(): ... while chunk := f.read(4096): ... yield chunk - ... response = client.play_audio(chunks(), config) + ... response = client.play_audio(chunks(), **config) """ def request_iterator() -> Generator[sound_pb2.PlayAudioRequest, None, None]: @@ -136,7 +150,7 @@ def record_audio(self, duration_ms: int = 0, **kwargs: Unpack[AudioConfig]) -> G raise RuntimeError(f"Recording error: {response.error}") yield response.audio_data - def stop_recording(self) -> common_pb2.ActionResponse: + def stop_recording(self) -> ActionResponse: """Stop an ongoing recording session. Returns: diff --git a/kos-py/tests/test_pykos.py b/kos-py/tests/test_pykos.py index bd20f54..4ece108 100644 --- a/kos-py/tests/test_pykos.py +++ b/kos-py/tests/test_pykos.py @@ -30,7 +30,8 @@ def test_pykos() -> None: client.imu.get_euler_angles() client.imu.get_quaternion() client.imu.calibrate() - zero_response = client.imu.zero(duration=1.0, max_retries=1, max_angular_error=1.0) + zero_response = client.imu.zero( + duration=1.0, max_retries=1, max_angular_error=1.0) assert zero_response.success # Tests the K-Clip endpoints. From 090e45d30b566960dc28a369622dccb038cdc2e9 Mon Sep 17 00:00:00 2001 From: Shikhar Mishra Date: Tue, 21 Jan 2025 09:56:53 +0530 Subject: [PATCH 2/4] Resolved failing static-checks for TypedDict definitions --- kos-py/pykos/services/imu.py | 21 ++++++++++-------- kos-py/pykos/services/process_manager.py | 3 ++- kos-py/tests/test_pykos.py | 28 +++++++++++++----------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/kos-py/pykos/services/imu.py b/kos-py/pykos/services/imu.py index cd39e7e..8b3bba6 100644 --- a/kos-py/pykos/services/imu.py +++ b/kos-py/pykos/services/imu.py @@ -13,15 +13,18 @@ class IMUValuesResponse(TypedDict): - acceleration_x: float - acceleration_y: float - acceleration_z: float - gyroscope_x: float - gyroscope_y: float - gyroscope_z: float - magnetometer_x: float - magnetometer_y: float - magnetometer_z: float + """Response containing IMU sensor values.""" + + accel_x: float + accel_y: float + accel_z: float + gyro_x: float + gyro_y: float + gyro_z: float + mag_x: NotRequired[float | None] + mag_y: NotRequired[float | None] + mag_z: NotRequired[float | None] + error: NotRequired[common_pb2.Error | None] class IMUAdvancedValuesResponse(TypedDict): diff --git a/kos-py/pykos/services/process_manager.py b/kos-py/pykos/services/process_manager.py index a6f87d0..5015bb2 100644 --- a/kos-py/pykos/services/process_manager.py +++ b/kos-py/pykos/services/process_manager.py @@ -13,6 +13,7 @@ class KClipStartResponse(TypedDict): """Response from starting KClip recording.""" success: bool + clip_uuid: str error: NotRequired[common_pb2.Error | None] @@ -20,8 +21,8 @@ class KClipStopResponse(TypedDict): """Response from stopping KClip recording.""" success: bool + clip_uuid: str error: NotRequired[common_pb2.Error | None] - clip_path: NotRequired[str | None] class ProcessManagerServiceClient: diff --git a/kos-py/tests/test_pykos.py b/kos-py/tests/test_pykos.py index 4ece108..a69773f 100644 --- a/kos-py/tests/test_pykos.py +++ b/kos-py/tests/test_pykos.py @@ -4,6 +4,9 @@ import pytest import pykos +from pykos.services.actuator import ActionResult, GetActuatorsStateResponse +from pykos.services.imu import IMUValuesResponse +from pykos.services.process_manager import KClipStartResponse, KClipStopResponse def test_dummy() -> None: @@ -16,29 +19,28 @@ def test_pykos() -> None: client = pykos.KOS("127.0.0.1") # Tests configuring the actuator. - actuator_response = client.actuator.configure_actuator(actuator_id=1) - assert actuator_response.success + actuator_response: ActionResult = client.actuator.configure_actuator(actuator_id=1) + assert actuator_response["success"] # Tests getting the actuator state. - actuator_state = client.actuator.get_actuators_state(actuator_ids=[1]) - assert actuator_state.states[0].actuator_id == 1 + actuator_state: GetActuatorsStateResponse = client.actuator.get_actuators_state(actuator_ids=[1]) + assert actuator_state["states"][0]["actuator_id"] == 1 # Tests the IMU endpoints. - imu_response = client.imu.get_imu_values() - assert imu_response.accel_x is not None + imu_response: IMUValuesResponse = client.imu.get_imu_values() + assert imu_response["accel_x"] is not None client.imu.get_imu_advanced_values() client.imu.get_euler_angles() client.imu.get_quaternion() client.imu.calibrate() - zero_response = client.imu.zero( - duration=1.0, max_retries=1, max_angular_error=1.0) - assert zero_response.success + zero_response = client.imu.zero(duration=1.0, max_retries=1, max_angular_error=1.0) + assert zero_response["success"] # Tests the K-Clip endpoints. - start_kclip_response = client.process_manager.start_kclip(action="start") - assert start_kclip_response.clip_uuid is not None - stop_kclip_response = client.process_manager.stop_kclip() - assert stop_kclip_response.clip_uuid is not None + start_kclip_response: KClipStartResponse = client.process_manager.start_kclip(action="start") + assert start_kclip_response["clip_uuid"] is not None + stop_kclip_response: KClipStopResponse = client.process_manager.stop_kclip() + assert stop_kclip_response["clip_uuid"] is not None def is_server_running(address: str) -> bool: From 5e114f06c0e2095f24f40bb8277c6ba279d6ca7d Mon Sep 17 00:00:00 2001 From: Shikhar Mishra Date: Tue, 21 Jan 2025 22:28:09 +0530 Subject: [PATCH 3/4] Added more descriptive docstrings --- kos-py/pykos/services/actuator.py | 90 ++++++++++++++++-- kos-py/pykos/services/imu.py | 112 ++++++++++++++++++++--- kos-py/pykos/services/inference.py | 34 ++++--- kos-py/pykos/services/led_matrix.py | 39 ++++---- kos-py/pykos/services/process_manager.py | 32 ++++++- kos-py/pykos/services/sim.py | 71 ++++++++++---- kos-py/pykos/services/sound.py | 65 +++++++------ 7 files changed, 339 insertions(+), 104 deletions(-) diff --git a/kos-py/pykos/services/actuator.py b/kos-py/pykos/services/actuator.py index 28b6d45..014483b 100644 --- a/kos-py/pykos/services/actuator.py +++ b/kos-py/pykos/services/actuator.py @@ -1,6 +1,6 @@ """Actuator service client.""" -from typing import List, NotRequired, TypedDict, Unpack +from typing import NotRequired, TypedDict, Unpack import grpc from google.longrunning import operations_pb2, operations_pb2_grpc @@ -11,16 +11,51 @@ class ActionResult(TypedDict): + """TypedDict containing the result of an actuator action. + + A dictionary type that includes the actuator ID and whether the action succeeded. + + Fields: + actuator_id: The ID of the actuator that performed the action + success: Whether the action completed successfully + error: Optional error information if the action failed + """ + actuator_id: int success: bool - error: NotRequired[common_pb2.Error] + error: NotRequired[common_pb2.Error | None] class CommandActuatorsResponse(TypedDict): - results: List[ActionResult] + """TypedDict containing response from actuator command execution. + + A dictionary type containing a list of ActionResult objects for each command sent. + + Fields: + results: List of ActionResult objects indicating success/failure for each command + error: Optional error information if the overall command failed + """ + + results: list[ActionResult] + error: NotRequired[common_pb2.Error | None] class ActuatorStateResponse(TypedDict): + """TypedDict containing the current state of an actuator. + + A dictionary type containing various measurements and states for a specific actuator. + + Fields: + actuator_id: The ID of the actuator + online: Whether the actuator is currently online + position: Optional current position of the actuator + velocity: Optional current velocity of the actuator + torque: Optional current torque of the actuator + temperature: Optional current temperature of the actuator + voltage: Optional current voltage of the actuator + current: Optional current draw of the actuator + """ + actuator_id: int online: bool position: NotRequired[float] @@ -32,10 +67,31 @@ class ActuatorStateResponse(TypedDict): class GetActuatorsStateResponse(TypedDict): - states: List[ActuatorStateResponse] + """TypedDict containing response for actuator state query. + + A dictionary type containing a list of actuator states. + + Fields: + states: List of ActuatorStateResponse objects for each queried actuator + error: Optional error information if the query failed + """ + + states: list[ActuatorStateResponse] + error: NotRequired[common_pb2.Error | None] class ActuatorCommand(TypedDict): + """TypedDict containing command parameters for an actuator. + + A dictionary type specifying various control parameters for an actuator. + + Fields: + actuator_id: The ID of the actuator to command + position: Optional target position to move to + velocity: Optional target velocity to maintain + torque: Optional target torque to apply + """ + actuator_id: int position: NotRequired[float] velocity: NotRequired[float] @@ -43,6 +99,23 @@ class ActuatorCommand(TypedDict): class ConfigureActuatorRequest(TypedDict): + """TypedDict containing configuration parameters for an actuator. + + A dictionary type specifying various configuration options for an actuator. + + Fields: + actuator_id: The ID of the actuator to configure + kp: Optional proportional gain for position control + kd: Optional derivative gain for position control + ki: Optional integral gain for position control + max_torque: Optional maximum torque limit + protective_torque: Optional protective torque threshold + protection_time: Optional protection activation time + torque_enabled: Optional flag to enable/disable torque + new_actuator_id: Optional new ID to assign to the actuator + zero_position: Optional flag to set current position as zero + """ + actuator_id: int kp: NotRequired[float] kd: NotRequired[float] @@ -122,7 +195,8 @@ def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuators 'velocity', and 'torque'. Returns: - CommandActuatorsResponse is a list of ActionResult objects indicating success/failure for each command. + CommandActuatorsResponse is a dictionary where the key 'results' corresponds to a list of + ActionResult objects indicating success/failure for each command. """ actuator_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] request = actuator_pb2.CommandActuatorsRequest(commands=actuator_commands) @@ -159,7 +233,8 @@ def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> Acti protection_time, torque_enabled, new_actuator_id Returns: - ActionResult conatining the actuator_id and a ActionResponse indicating success/failure + ActionResult is a dictionary containing the actuator_id and a success flag indicating + whether the configuration was successful. """ request = actuator_pb2.ConfigureActuatorRequest(**kwargs) return self.stub.ConfigureActuator(request) @@ -174,7 +249,8 @@ def get_actuators_state(self, actuator_ids: list[int] | None = None) -> GetActua actuator_ids: List of actuator IDs to query. If None, gets state of all actuators. Returns: - GetActuatorsStateResponse containing a list of ActuatorStateResponse objects with state information + GetActuatorsStateResponse is a dictionary where the key 'states' corresponds to a list of + ActuatorStateResponse objects containing the current state of each queried actuator. """ request = actuator_pb2.GetActuatorsStateRequest(actuator_ids=actuator_ids or []) return self.stub.GetActuatorsState(request) diff --git a/kos-py/pykos/services/imu.py b/kos-py/pykos/services/imu.py index 8b3bba6..8429668 100644 --- a/kos-py/pykos/services/imu.py +++ b/kos-py/pykos/services/imu.py @@ -13,7 +13,23 @@ class IMUValuesResponse(TypedDict): - """Response containing IMU sensor values.""" + """TypedDict containing basic IMU sensor measurements. + + A dictionary type containing raw accelerometer, gyroscope, and optional + magnetometer readings from the IMU sensor. + + Fields: + accel_x: Acceleration along X-axis in m/s² + accel_y: Acceleration along Y-axis in m/s² + accel_z: Acceleration along Z-axis in m/s² + gyro_x: Angular velocity around X-axis in rad/s + gyro_y: Angular velocity around Y-axis in rad/s + gyro_z: Angular velocity around Z-axis in rad/s + mag_x: Optional magnetic field strength along X-axis + mag_y: Optional magnetic field strength along Y-axis + mag_z: Optional magnetic field strength along Z-axis + error: Optional error information if the reading failed + """ accel_x: float accel_y: float @@ -28,28 +44,83 @@ class IMUValuesResponse(TypedDict): class IMUAdvancedValuesResponse(TypedDict): + """TypedDict containing processed IMU measurements. + + A dictionary type containing filtered and processed IMU readings, + including linear accelerations and angular velocities with gravity compensation. + + Fields: + linear_acceleration_x: Gravity-compensated acceleration along X-axis in m/s² + linear_acceleration_y: Gravity-compensated acceleration along Y-axis in m/s² + linear_acceleration_z: Gravity-compensated acceleration along Z-axis in m/s² + angular_velocity_x: Filtered angular velocity around X-axis in rad/s + angular_velocity_y: Filtered angular velocity around Y-axis in rad/s + angular_velocity_z: Filtered angular velocity around Z-axis in rad/s + error: Optional error information if the processing failed + """ + linear_acceleration_x: float linear_acceleration_y: float linear_acceleration_z: float - gravity_x: float - gravity_y: float - gravity_z: float - rotation_rate_x: float - rotation_rate_y: float - rotation_rate_z: float + angular_velocity_x: float + angular_velocity_y: float + angular_velocity_z: float + error: NotRequired[common_pb2.Error | None] class EulerAnglesResponse(TypedDict): + """TypedDict containing orientation in Euler angles. + + A dictionary type containing the IMU's orientation expressed as Euler angles + in the roll-pitch-yaw convention. + + Fields: + roll: Rotation around X-axis in radians + pitch: Rotation around Y-axis in radians + yaw: Rotation around Z-axis in radians + error: Optional error information if the calculation failed + """ + roll: float pitch: float yaw: float + error: NotRequired[common_pb2.Error | None] class QuaternionResponse(TypedDict): + """TypedDict containing orientation as quaternion. + + A dictionary type containing the IMU's orientation expressed as a unit quaternion, + which provides a singularity-free representation of orientation. + + Fields: + w: Scalar component of the quaternion + x: X component of the quaternion's vector part + y: Y component of the quaternion's vector part + z: Z component of the quaternion's vector part + error: Optional error information if the calculation failed + """ + w: float x: float y: float z: float + error: NotRequired[common_pb2.Error | None] + + +class ZeroIMUResponse(TypedDict): + """TypedDict containing response from IMU zeroing operation. + + A dictionary type indicating whether the IMU zeroing operation succeeded. + Zeroing sets the current orientation as the reference orientation. + + Fields: + success: Whether the zeroing operation completed successfully + error: Optional error information if the operation failed + """ + + success: bool + error: NotRequired[common_pb2.Error | None] class ActionResponse(TypedDict): @@ -110,8 +181,11 @@ def get_imu_values(self) -> IMUValuesResponse: """Get the latest IMU sensor values. Returns: - IMUValuesResponse: The latest IMU sensor values including acceleration, - gyroscope, and magnetometer readings on x, y, z axes. + IMUValuesResponse is a dictionary containing raw sensor measurements where: + - 'accel_x/y/z' contain acceleration values in m/s² + - 'gyro_x/y/z' contain angular velocity values in rad/s + - 'mag_x/y/z' optionally contain magnetic field measurements + - 'error' contains any error information if the reading failed """ return self.stub.GetValues(Empty()) @@ -119,8 +193,10 @@ def get_imu_advanced_values(self) -> IMUAdvancedValuesResponse: """Get the latest IMU advanced values. Returns: - IMUAdvancedValuesResponse: The latest IMU advanced values including linear acceleration, - gravity, and rotation rate on x, y, z axes. + IMUAdvancedValuesResponse is a dictionary containing processed measurements where: + - 'linear_acceleration_x/y/z' contain gravity-compensated acceleration in m/s² + - 'angular_velocity_x/y/z' contain filtered angular velocity in rad/s + - 'error' contains any error information if the processing failed """ return self.stub.GetAdvancedValues(Empty()) @@ -128,7 +204,11 @@ def get_euler_angles(self) -> EulerAnglesResponse: """Get the latest Euler angles. Returns: - EulerAnglesResponse: The latest Euler angles (roll, pitch, yaw). + EulerAnglesResponse is a dictionary containing orientation angles where: + - 'roll' contains rotation around X-axis in radians + - 'pitch' contains rotation around Y-axis in radians + - 'yaw' contains rotation around Z-axis in radians + - 'error' contains any error information if the calculation failed """ return self.stub.GetEuler(Empty()) @@ -136,7 +216,10 @@ def get_quaternion(self) -> QuaternionResponse: """Get the latest quaternion. Returns: - QuaternionResponse: The latest quaternion values (w, x, y, z). + QuaternionResponse is a dictionary containing orientation as quaternion where: + - 'w' contains the scalar component + - 'x/y/z' contain the vector components + - 'error' contains any error information if the calculation failed """ return self.stub.GetQuaternion(Empty()) @@ -160,7 +243,8 @@ def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> Actio max_acceleration: Maximum acceleration during zeroing Returns: - ActionResponse: Response indicating success/failure of the zero operation. + ActionResponse is a dictionary where 'success' indicates if the zeroing operation + was successful, and 'error' contains any error information if the operation failed. """ request = imu_pb2.ZeroIMURequest(duration=_duration_from_seconds(duration), **kwargs) return self.stub.Zero(request) diff --git a/kos-py/pykos/services/inference.py b/kos-py/pykos/services/inference.py index 79300e2..1ee3323 100644 --- a/kos-py/pykos/services/inference.py +++ b/kos-py/pykos/services/inference.py @@ -8,9 +8,9 @@ class ModelMetadata(TypedDict): - """Model metadata for uploading models. + """TypedDict containing model metadata for uploading models. - All fields are optional and can be used to provide additional information about the model. + Contains optional fields providing additional information about the model. """ model_name: NotRequired[str | None] @@ -121,11 +121,11 @@ def upload_model(self, model_data: bytes, metadata: ModelMetadata | None = None) """Upload a model to the robot. Example: - >>> client.upload_model(model_data, - ... metadata={"model_name": "MyModel", - ... "model_description": "A model for inference", - ... "model_version": "1.0.0", - ... "model_author": "John Doe"}) + >>> client.upload_model(model_data, + ... metadata={"model_name": "MyModel", + ... "model_description": "A model for inference", + ... "model_version": "1.0.0", + ... "model_author": "John Doe"}) Args: model_data: The binary model data to upload. @@ -136,7 +136,9 @@ def upload_model(self, model_data: bytes, metadata: ModelMetadata | None = None) model_author: Author of the model Returns: - UploadModelResponse containing the model UID and any error information. + UploadModelResponse is a dictionary where 'uid' contains the unique identifier + assigned to the uploaded model, and 'error' contains any error information if + the upload failed. """ proto_metadata = None if metadata is not None: @@ -151,7 +153,9 @@ def load_models(self, uids: list[str]) -> LoadModelsResponse: uids: List of model UIDs to load. Returns: - LoadModelsResponse containing success status and any error information. + LoadModelsResponse is a dictionary where 'success' indicates if all models + were loaded successfully, and 'error' contains any error information if the + loading failed. """ request = inference_pb2.ModelUids(uids=uids) return self.stub.LoadModels(request) @@ -176,9 +180,9 @@ def get_models_info(self, model_uids: list[str] | None = None) -> GetModelsInfoR If None, returns info for all models. Returns: - GetModelsInfoResponse containing: - models: List of ModelInfo objects - error: Optional error information if fetching failed + GetModelsInfoResponse is a dictionary where 'models' contains a list of ModelInfo + objects with details about each model, and 'error' contains any error information + if the query failed. """ if model_uids is not None: request = inference_pb2.GetModelsInfoRequest(model_uids=inference_pb2.ModelUids(uids=model_uids)) @@ -236,9 +240,9 @@ def forward(self, model_uid: str, inputs: dict[str, Tensor]) -> ForwardResponse: inputs: Dictionary mapping tensor names to tensors. Returns: - ForwardResponse containing: - outputs: Dictionary mapping tensor names to output tensors - error: Optional error information if inference failed + ForwardResponse is a dictionary where 'outputs' contains a mapping of tensor + names to output tensors from the model inference, and 'error' contains any + error information if the inference failed. """ tensor_inputs = {} for name, tensor in inputs.items(): diff --git a/kos-py/pykos/services/led_matrix.py b/kos-py/pykos/services/led_matrix.py index 62ee976..664fea9 100644 --- a/kos-py/pykos/services/led_matrix.py +++ b/kos-py/pykos/services/led_matrix.py @@ -9,9 +9,12 @@ class MatrixInfo(TypedDict): - """Information about the LED matrix. + """TypedDict containing LED matrix configuration details. - Args: + A dictionary type describing the physical layout and capabilities + of the LED matrix display. + + Fields: width: Width in pixels height: Height in pixels brightness_levels: Number of brightness levels supported (1 for binary on/off) @@ -71,13 +74,13 @@ def get_matrix_info(self) -> MatrixInfo: """Get information about the LED matrix including dimensions and capabilities. Returns: - MatrixInfo containing: - width: Width in pixels - height: Height in pixels - brightness_levels: Number of brightness levels supported - color_capable: Whether the matrix supports color - bits_per_pixel: Number of bits used to represent each pixel - error: Optional error information + MatrixInfo is a dictionary containing LED matrix configuration where: + - 'width' contains the number of LEDs in horizontal dimension + - 'height' contains the number of LEDs in vertical dimension + - 'brightness_levels' contains the number of supported brightness levels + - 'color_capable' indicates whether the matrix supports color + - 'bits_per_pixel' contains the number of bits used per pixel + - 'error' contains any error information if the query failed """ return self.stub.GetMatrixInfo(Empty()) @@ -91,7 +94,8 @@ def write_buffer(self, buffer: bytes) -> ActionResponse: buffer: Binary buffer containing LED states Returns: - ActionResponse indicating success/failure of the write operation. + ActionResponse is a dictionary where 'success' indicates if the write operation + was successful, and 'error' contains any error information if the operation failed. """ request = led_matrix_pb2.WriteBufferRequest(buffer=buffer) return self.stub.WriteBuffer(request) @@ -100,15 +104,16 @@ def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> ActionResponse: """Write image data to the LED matrix. Args: - **kwargs: Image data containing the raw bytes, dimensions and format - buffer: Raw image data bytes - width: Image width in pixels - height: Image height in pixels - format: Pixel format specification (e.g. 'RGB888', 'BGR888', 'RGB565', 'MONO8') - brightness: Global brightness level (0-255) + **kwargs: Image data containing: + buffer: Raw image data bytes + width: Image width in pixels + height: Image height in pixels + format: Pixel format specification (e.g. 'RGB888', 'BGR888', 'RGB565', 'MONO8') + brightness: Global brightness level (0-255) Returns: - ActionResponse indicating success/failure of the write operation. + ActionResponse is a dictionary where 'success' indicates if the write operation + was successful, and 'error' contains any error information if the operation failed. """ request = led_matrix_pb2.WriteColorBufferRequest(**kwargs) return self.stub.WriteColorBuffer(request) diff --git a/kos-py/pykos/services/process_manager.py b/kos-py/pykos/services/process_manager.py index 5015bb2..95a12bf 100644 --- a/kos-py/pykos/services/process_manager.py +++ b/kos-py/pykos/services/process_manager.py @@ -10,7 +10,16 @@ class KClipStartResponse(TypedDict): - """Response from starting KClip recording.""" + """TypedDict containing response from starting a KClip recording. + + A dictionary type containing information about a newly started + KClip recording session, including its unique identifier. + + Fields: + success: Whether the recording started successfully + clip_uuid: Unique identifier for the recording session + error: Optional error information if start failed + """ success: bool clip_uuid: str @@ -18,7 +27,16 @@ class KClipStartResponse(TypedDict): class KClipStopResponse(TypedDict): - """Response from stopping KClip recording.""" + """TypedDict containing response from stopping a KClip recording. + + A dictionary type containing information about the stopped + KClip recording session, including its identifier. + + Fields: + success: Whether the recording stopped successfully + clip_uuid: Identifier of the stopped recording session + error: Optional error information if stop failed + """ success: bool clip_uuid: str @@ -36,7 +54,10 @@ def start_kclip(self, action: str) -> KClipStartResponse: action: The action string for the KClip request Returns: - KClipStartResponse containing success status and any error information. + KClipStartResponse is a dictionary where: + - 'success' indicates if the recording started successfully + - 'clip_uuid' contains the unique identifier for the recording session + - 'error' contains any error information if the start failed """ request = KClipStartRequest(action=action) return self.stub.StartKClip(request) @@ -45,6 +66,9 @@ def stop_kclip(self, request: Empty = Empty()) -> KClipStopResponse: """Stop KClip recording. Returns: - KClipStopResponse containing success status, clip path, and any error information. + KClipStopResponse is a dictionary where: + - 'success' indicates if the recording stopped successfully + - 'clip_uuid' contains the identifier of the stopped recording session + - 'error' contains any error information if the stop failed """ return self.stub.StopKClip(request) diff --git a/kos-py/pykos/services/sim.py b/kos-py/pykos/services/sim.py index 6233a6d..9aa155f 100644 --- a/kos-py/pykos/services/sim.py +++ b/kos-py/pykos/services/sim.py @@ -9,13 +9,28 @@ class DefaultPosition(TypedDict): - """Default position state for simulation.""" + """TypedDict containing initial simulation state. + + A dictionary type specifying the initial joint positions for + resetting the simulation to a known state. + + Fields: + qpos: List of joint positions in simulation units + """ qpos: list[float] class ResetRequest(TypedDict): - """Request parameters for resetting simulation.""" + """TypedDict containing simulation reset parameters. + + A dictionary type specifying how the simulation should be reset, + including optional initial state and randomization settings. + + Fields: + initial_state: Optional DefaultPosition to set initial joint positions + randomize: Optional flag to add randomization during reset + """ initial_state: NotRequired[DefaultPosition] randomize: NotRequired[bool] @@ -59,19 +74,20 @@ def __init__(self, channel: grpc.Channel) -> None: def reset(self, **kwargs: Unpack[ResetRequest]) -> ActionResponse: """Reset the simulation to its initial state. - Args: - **kwargs: Reset parameters that may include: - initial_state: DefaultPosition to reset to - randomize: Whether to randomize the initial state - Example: - >>> client.reset( + >>> reset( ... initial_state={"qpos": [0.0, 0.0, 0.0]}, ... randomize=True ... ) + Args: + **kwargs: Reset parameters that may include: + initial_state: DefaultPosition to reset to + randomize: Whether to randomize the initial state + Returns: - ActionResponse indicating success/failure + ActionResponse is a dictionary where 'success' indicates if the reset operation + was successful, and 'error' contains any error information if the reset failed. """ initial_state = None if "initial_state" in kwargs: @@ -84,11 +100,16 @@ def reset(self, **kwargs: Unpack[ResetRequest]) -> ActionResponse: def set_paused(self, paused: bool) -> ActionResponse: """Pause or unpause the simulation. + Example: + >>> set_paused(True) # Pause simulation + >>> set_paused(False) # Resume simulation + Args: paused: True to pause, False to unpause Returns: - ActionResponse indicating success/failure + ActionResponse is a dictionary where 'success' indicates if the pause state + was set successfully, and 'error' contains any error information if the operation failed. """ request = sim_pb2.SetPausedRequest(paused=paused) return self.stub.SetPaused(request) @@ -96,12 +117,17 @@ def set_paused(self, paused: bool) -> ActionResponse: def step(self, num_steps: int, step_size: float | None = None) -> ActionResponse: """Step the simulation forward. + Example: + >>> step(num_steps=100, step_size=0.001) # Step forward 100 times with 1ms steps + >>> step(num_steps=50) # Step forward 50 times with default step size + Args: num_steps: Number of simulation steps to take step_size: Optional time per step in seconds Returns: - ActionResponse indicating success/failure + ActionResponse is a dictionary where 'success' indicates if the stepping operation + was successful, and 'error' contains any error information if the stepping failed. """ request = sim_pb2.StepRequest(num_steps=num_steps, step_size=step_size) return self.stub.Step(request) @@ -110,11 +136,11 @@ def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> ActionRespon """Set simulation parameters. Example: - >>> client.set_parameters( - ... time_scale=1.0, - ... gravity=9.81, - ... initial_state={"qpos": [0.0, 0.0, 0.0]} - ... ) + >>> set_parameters( + ... time_scale=1.0, + ... gravity=9.81, + ... initial_state={"qpos": [0.0, 0.0, 0.0]} + ... ) Args: **kwargs: Parameters that may include: @@ -123,7 +149,8 @@ def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> ActionRespon initial_state: Default position state Returns: - ActionResponse indicating success/failure + ActionResponse is a dictionary where 'success' indicates if the parameters were + set successfully, and 'error' contains any error information if the operation failed. """ initial_state = None if "initial_state" in kwargs: @@ -139,8 +166,14 @@ def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> ActionRespon def get_parameters(self) -> GetParametersResponse: """Get current simulation parameters. + Example: + >>> get_parameters() + Returns: - GetParametersResponse containing current parameters (time_scale, gravity, initial_state) - and any error information. + GetParametersResponse is a dictionary where: + - 'time_scale' contains the current simulation time scaling factor + - 'gravity' contains the current gravity constant value + - 'initial_state' contains the default position state as a DefaultPosition dictionary + - 'error' contains any error information if the query failed """ return self.stub.GetParameters(Empty()) diff --git a/kos-py/pykos/services/sound.py b/kos-py/pykos/services/sound.py index ee38dcf..97e571b 100644 --- a/kos-py/pykos/services/sound.py +++ b/kos-py/pykos/services/sound.py @@ -9,13 +9,16 @@ class AudioCapability(TypedDict): - """Information about audio capabilities. + """TypedDict containing information about audio capabilities. - Args: - sample_rates: List of supported sample rates (e.g., 44100, 48000) - bit_depths: List of supported bit depths (e.g., 16, 24, 32) - channels: List of supported channel counts (e.g., 1, 2) - available: Whether this capability is available + A dictionary type describing the supported audio configurations + and current availability of the audio system. + + Fields: + sample_rates: List of supported sampling rates in Hz + bit_depths: List of supported bit depths + channels: List of supported channel counts + available: Whether the audio system is currently available """ sample_rates: list[int] @@ -84,23 +87,16 @@ def get_audio_info(self) -> AudioInfo: """Get information about audio capabilities. Returns: - AudioInfo containing playback and recording capabilities. + AudioInfo is a dictionary where: + - 'playback' contains an AudioCapability dictionary describing playback capabilities: + - 'recording' contains an AudioCapability dictionary describing recording capabilities + - 'error' contains any error information if the query failed """ return self.stub.GetAudioInfo(Empty()) def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConfig]) -> ActionResponse: """Stream PCM audio data to the speaker. - Args: - audio_iterator: Iterator yielding chunks of PCM audio data - **kwargs: Audio configuration parameters - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) - - Returns: - ActionResponse indicating success/failure of the playback operation. - Example: >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=2) >>> with open('audio.raw', 'rb') as f: @@ -108,6 +104,17 @@ def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConf ... while chunk := f.read(4096): ... yield chunk ... response = client.play_audio(chunks(), **config) + + Args: + audio_iterator: Iterator yielding chunks of PCM audio data + **kwargs: Audio configuration parameters: + sample_rate: Sample rate in Hz (e.g., 44100) + bit_depth: Bit depth (e.g., 16) + channels: Number of channels (1 for mono, 2 for stereo) + + Returns: + ActionResponse is a dictionary where 'success' indicates if the playback operation + was successful, and 'error' contains any error information if the operation failed. """ def request_iterator() -> Generator[sound_pb2.PlayAudioRequest, None, None]: @@ -124,21 +131,22 @@ def request_iterator() -> Generator[sound_pb2.PlayAudioRequest, None, None]: def record_audio(self, duration_ms: int = 0, **kwargs: Unpack[AudioConfig]) -> Generator[bytes, None, None]: """Record PCM audio data from the microphone. - Args: - duration_ms: Recording duration in milliseconds (0 for continuous) - **kwargs: Audio configuration parameters - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) - - Yields: - Chunks of PCM audio data. - Example: >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=1) >>> with open('recording.raw', 'wb') as f: ... for chunk in client.record_audio(duration_ms=5000, **config): ... f.write(chunk) + + Args: + duration_ms: Recording duration in milliseconds (0 for continuous) + **kwargs: Audio configuration parameters: + sample_rate: Sample rate in Hz (e.g., 44100) + bit_depth: Bit depth (e.g., 16) + channels: Number of channels (1 for mono, 2 for stereo) + + Yields: + Chunks of PCM audio data as bytes. If an error occurs during recording, + a RuntimeError will be raised with the error details. """ request = sound_pb2.RecordAudioRequest( config=sound_pb2.AudioConfig(**kwargs), @@ -154,6 +162,7 @@ def stop_recording(self) -> ActionResponse: """Stop an ongoing recording session. Returns: - ActionResponse indicating success/failure of the stop operation. + ActionResponse is a dictionary where 'success' indicates if the recording was + stopped successfully, and 'error' contains any error information if the stop failed. """ return self.stub.StopRecording(Empty()) From 3b374105bca7d3b19ad5b6a86253a691ca024a89 Mon Sep 17 00:00:00 2001 From: Shikhar Mishra Date: Sun, 26 Jan 2025 16:12:09 +0530 Subject: [PATCH 4/4] Updated services to use protobuf return types --- kos-py/pykos/services/actuator.py | 213 ++++++++-------------- kos-py/pykos/services/imu.py | 140 ++++---------- kos-py/pykos/services/inference.py | 221 ++++++----------------- kos-py/pykos/services/led_matrix.py | 58 +++--- kos-py/pykos/services/process_manager.py | 70 +++---- kos-py/pykos/services/sim.py | 155 +++++++--------- kos-py/pykos/services/sound.py | 128 ++++++------- kos-py/tests/test_pykos.py | 5 +- 8 files changed, 357 insertions(+), 633 deletions(-) diff --git a/kos-py/pykos/services/actuator.py b/kos-py/pykos/services/actuator.py index 014483b..8f45d87 100644 --- a/kos-py/pykos/services/actuator.py +++ b/kos-py/pykos/services/actuator.py @@ -7,89 +7,24 @@ from google.protobuf.any_pb2 import Any as AnyPb2 from kos_protos import actuator_pb2, actuator_pb2_grpc, common_pb2 -from kos_protos.actuator_pb2 import CalibrateActuatorMetadata - - -class ActionResult(TypedDict): - """TypedDict containing the result of an actuator action. - - A dictionary type that includes the actuator ID and whether the action succeeded. - - Fields: - actuator_id: The ID of the actuator that performed the action - success: Whether the action completed successfully - error: Optional error information if the action failed - """ - - actuator_id: int - success: bool - error: NotRequired[common_pb2.Error | None] - - -class CommandActuatorsResponse(TypedDict): - """TypedDict containing response from actuator command execution. - - A dictionary type containing a list of ActionResult objects for each command sent. - - Fields: - results: List of ActionResult objects indicating success/failure for each command - error: Optional error information if the overall command failed - """ - - results: list[ActionResult] - error: NotRequired[common_pb2.Error | None] - - -class ActuatorStateResponse(TypedDict): - """TypedDict containing the current state of an actuator. - - A dictionary type containing various measurements and states for a specific actuator. - - Fields: - actuator_id: The ID of the actuator - online: Whether the actuator is currently online - position: Optional current position of the actuator - velocity: Optional current velocity of the actuator - torque: Optional current torque of the actuator - temperature: Optional current temperature of the actuator - voltage: Optional current voltage of the actuator - current: Optional current draw of the actuator - """ - - actuator_id: int - online: bool - position: NotRequired[float] - velocity: NotRequired[float] - torque: NotRequired[float] - temperature: NotRequired[float] - voltage: NotRequired[float] - current: NotRequired[float] - - -class GetActuatorsStateResponse(TypedDict): - """TypedDict containing response for actuator state query. - - A dictionary type containing a list of actuator states. - - Fields: - states: List of ActuatorStateResponse objects for each queried actuator - error: Optional error information if the query failed - """ - - states: list[ActuatorStateResponse] - error: NotRequired[common_pb2.Error | None] +from kos_protos.actuator_pb2 import ( + CalibrateActuatorMetadata, + CommandActuatorsRequest, + CommandActuatorsResponse, + ConfigureActuatorRequest, + GetActuatorsStateRequest, + GetActuatorsStateResponse, +) class ActuatorCommand(TypedDict): - """TypedDict containing command parameters for an actuator. - - A dictionary type specifying various control parameters for an actuator. + """Command parameters for an actuator. Fields: actuator_id: The ID of the actuator to command - position: Optional target position to move to - velocity: Optional target velocity to maintain - torque: Optional target torque to apply + position: Optional target position in degrees + velocity: Optional target velocity in degrees/second + torque: Optional target torque in Nm """ actuator_id: int @@ -98,10 +33,8 @@ class ActuatorCommand(TypedDict): torque: NotRequired[float] -class ConfigureActuatorRequest(TypedDict): - """TypedDict containing configuration parameters for an actuator. - - A dictionary type specifying various configuration options for an actuator. +class ConfigureActuatorParams(TypedDict): + """Configuration parameters for an actuator. Fields: actuator_id: The ID of the actuator to configure @@ -128,10 +61,6 @@ class ConfigureActuatorRequest(TypedDict): zero_position: NotRequired[bool] -class ActuatorStateRequest(TypedDict): - actuator_ids: list[int] - - class CalibrationStatus: Calibrating = "calibrating" Calibrated = "calibrated" @@ -148,8 +77,10 @@ def decode_metadata(self, metadata_any: AnyPb2) -> None: metadata = CalibrateActuatorMetadata() if metadata_any.Is(CalibrateActuatorMetadata.DESCRIPTOR): metadata_any.Unpack(metadata) - self.actuator_id = metadata.actuator_id - self.status = metadata.status if metadata.HasField("status") else None + if metadata.HasField("actuator_id"): + self.actuator_id = metadata.actuator_id + if metadata.HasField("status"): + self.status = metadata.status def __str__(self) -> str: return f"CalibrationMetadata(actuator_id={self.actuator_id}, status={self.status})" @@ -166,91 +97,91 @@ def __init__(self, channel: grpc.Channel) -> None: def calibrate(self, actuator_id: int) -> CalibrationMetadata: """Calibrate an actuator. + Args: + actuator_id: The ID of the actuator to calibrate + Returns: - Operation: The operation for the calibration. + CalibrationMetadata object containing the actuator ID and calibration status """ - response = self.stub.CalibrateActuator(actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id)) - metadata = CalibrationMetadata(response.metadata) - return metadata + request = actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id) + operation = self.stub.CalibrateActuator(request) + return CalibrationMetadata(operation.metadata) def get_calibration_status(self, actuator_id: int) -> str | None: - response = self.operations_stub.GetOperation( - operations_pb2.GetOperationRequest(name=f"operations/calibrate_actuator/{actuator_id}") - ) - metadata = CalibrationMetadata(response.metadata) - return metadata.status + """Get the calibration status of an actuator. - def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuatorsResponse: - """Command multiple actuators at once. + Args: + actuator_id: The ID of the actuator to check - Example: - >>> command_actuators([ - ... {"actuator_id": 1, "position": 90.0, "velocity": 100.0, "torque": 1.0}, - ... {"actuator_id": 2, "position": 180.0}, - ... ]) + Returns: + The calibration status string or None if not found + """ + request = operations_pb2.GetOperationRequest(name=str(actuator_id)) + operation = self.operations_stub.GetOperation(request) + return CalibrationMetadata(operation.metadata).status if operation.metadata else None + + def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuatorsResponse: + """Send commands to multiple actuators. Args: - commands: List of dictionaries containing actuator commands. - Each dict should have 'actuator_id' and optionally 'position', - 'velocity', and 'torque'. + commands: List of dictionaries specifying commands for each actuator. + Each dictionary must have 'actuator_id' and may include: + - position: Target position in degrees + - velocity: Target velocity in degrees/second + - torque: Target torque in Nm Returns: - CommandActuatorsResponse is a dictionary where the key 'results' corresponds to a list of - ActionResult objects indicating success/failure for each command. + CommandActuatorsResponse containing results for each command """ - actuator_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] - request = actuator_pb2.CommandActuatorsRequest(commands=actuator_commands) + proto_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] + request = CommandActuatorsRequest(commands=proto_commands) return self.stub.CommandActuators(request) - def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> ActionResult: - """Configure an actuator's parameters. + def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorParams]) -> common_pb2.ActionResponse: + """Configure an actuator with the specified parameters. Example: >>> configure_actuator( ... actuator_id=1, - ... kp=1.0, - ... kd=0.1, - ... ki=0.01, - ... max_torque=100.0, - ... protective_torque=None, - ... protection_time=None, + ... kp=10.0, + ... kd=1.0, + ... ki=0.1, + ... max_torque=2.0, + ... protective_torque=1.5, + ... protection_time=0.5, ... torque_enabled=True, - ... new_actuator_id=None, + ... new_actuator_id=2, ... zero_position=True ... ) - >>> configure_actuator( - ... actuator_id=2, - ... kp=1.0, - ... kd=0.1, - ... torque_enabled=True, - ... ) - Args: - actuator_id: ID of the actuator to configure - **kwargs: Configuration parameters that may include: - kp, kd, ki, max_torque, protective_torque, - protection_time, torque_enabled, new_actuator_id + **kwargs: Configuration parameters that must include: + actuator_id: The ID of the actuator to configure + And may optionally include: + kp: Proportional gain for position control + kd: Derivative gain for position control + ki: Integral gain for position control + max_torque: Maximum torque limit + protective_torque: Protective torque threshold + protection_time: Protection activation time + torque_enabled: Flag to enable/disable torque + new_actuator_id: New ID to assign to the actuator + zero_position: Flag to set current position as zero Returns: - ActionResult is a dictionary containing the actuator_id and a success flag indicating - whether the configuration was successful. + ActionResponse indicating if the configuration was successful """ - request = actuator_pb2.ConfigureActuatorRequest(**kwargs) + request = ConfigureActuatorRequest(**kwargs) return self.stub.ConfigureActuator(request) def get_actuators_state(self, actuator_ids: list[int] | None = None) -> GetActuatorsStateResponse: - """Get the state of multiple actuators. - - Example: - >>> get_actuators_state([1, 2]) + """Get the current state of specified actuators. Args: - actuator_ids: List of actuator IDs to query. If None, gets state of all actuators. + actuator_ids: Optional list of actuator IDs to query. If None, queries all actuators. Returns: - GetActuatorsStateResponse is a dictionary where the key 'states' corresponds to a list of - ActuatorStateResponse objects containing the current state of each queried actuator. + GetActuatorsStateResponse containing states for each queried actuator """ - request = actuator_pb2.GetActuatorsStateRequest(actuator_ids=actuator_ids or []) + request = GetActuatorsStateRequest(actuator_ids=actuator_ids or []) return self.stub.GetActuatorsState(request) diff --git a/kos-py/pykos/services/imu.py b/kos-py/pykos/services/imu.py index 8429668..613e89e 100644 --- a/kos-py/pykos/services/imu.py +++ b/kos-py/pykos/services/imu.py @@ -9,63 +9,12 @@ from google.protobuf.empty_pb2 import Empty from kos_protos import common_pb2, imu_pb2, imu_pb2_grpc -from kos_protos.imu_pb2 import CalibrateIMUMetadata - - -class IMUValuesResponse(TypedDict): - """TypedDict containing basic IMU sensor measurements. - - A dictionary type containing raw accelerometer, gyroscope, and optional - magnetometer readings from the IMU sensor. - - Fields: - accel_x: Acceleration along X-axis in m/s² - accel_y: Acceleration along Y-axis in m/s² - accel_z: Acceleration along Z-axis in m/s² - gyro_x: Angular velocity around X-axis in rad/s - gyro_y: Angular velocity around Y-axis in rad/s - gyro_z: Angular velocity around Z-axis in rad/s - mag_x: Optional magnetic field strength along X-axis - mag_y: Optional magnetic field strength along Y-axis - mag_z: Optional magnetic field strength along Z-axis - error: Optional error information if the reading failed - """ - - accel_x: float - accel_y: float - accel_z: float - gyro_x: float - gyro_y: float - gyro_z: float - mag_x: NotRequired[float | None] - mag_y: NotRequired[float | None] - mag_z: NotRequired[float | None] - error: NotRequired[common_pb2.Error | None] - - -class IMUAdvancedValuesResponse(TypedDict): - """TypedDict containing processed IMU measurements. - - A dictionary type containing filtered and processed IMU readings, - including linear accelerations and angular velocities with gravity compensation. - - Fields: - linear_acceleration_x: Gravity-compensated acceleration along X-axis in m/s² - linear_acceleration_y: Gravity-compensated acceleration along Y-axis in m/s² - linear_acceleration_z: Gravity-compensated acceleration along Z-axis in m/s² - angular_velocity_x: Filtered angular velocity around X-axis in rad/s - angular_velocity_y: Filtered angular velocity around Y-axis in rad/s - angular_velocity_z: Filtered angular velocity around Z-axis in rad/s - error: Optional error information if the processing failed - """ - - linear_acceleration_x: float - linear_acceleration_y: float - linear_acceleration_z: float - angular_velocity_x: float - angular_velocity_y: float - angular_velocity_z: float - error: NotRequired[common_pb2.Error | None] +from kos_protos.imu_pb2 import ( + CalibrateIMUMetadata, + IMUAdvancedValuesResponse, + IMUValuesResponse, + QuaternionResponse, +) class EulerAnglesResponse(TypedDict): @@ -87,27 +36,6 @@ class EulerAnglesResponse(TypedDict): error: NotRequired[common_pb2.Error | None] -class QuaternionResponse(TypedDict): - """TypedDict containing orientation as quaternion. - - A dictionary type containing the IMU's orientation expressed as a unit quaternion, - which provides a singularity-free representation of orientation. - - Fields: - w: Scalar component of the quaternion - x: X component of the quaternion's vector part - y: Y component of the quaternion's vector part - z: Z component of the quaternion's vector part - error: Optional error information if the calculation failed - """ - - w: float - x: float - y: float - z: float - error: NotRequired[common_pb2.Error | None] - - class ZeroIMUResponse(TypedDict): """TypedDict containing response from IMU zeroing operation. @@ -134,6 +62,15 @@ class CalibrateIMUResponse(TypedDict): class ZeroIMURequest(TypedDict): + """Parameters for zeroing the IMU. + + Fields: + max_retries: Optional maximum number of retries + max_angular_error: Optional maximum angular error during zeroing + max_velocity: Optional maximum velocity during zeroing + max_acceleration: Optional maximum acceleration during zeroing + """ + max_retries: NotRequired[int] max_angular_error: NotRequired[float] max_velocity: NotRequired[float] @@ -181,11 +118,11 @@ def get_imu_values(self) -> IMUValuesResponse: """Get the latest IMU sensor values. Returns: - IMUValuesResponse is a dictionary containing raw sensor measurements where: - - 'accel_x/y/z' contain acceleration values in m/s² - - 'gyro_x/y/z' contain angular velocity values in rad/s - - 'mag_x/y/z' optionally contain magnetic field measurements - - 'error' contains any error information if the reading failed + IMUValuesResponse containing: + - accel_x/y/z: Acceleration values in m/s² + - gyro_x/y/z: Angular velocity values in rad/s + - mag_x/y/z: Optional magnetic field measurements + - error: Optional error information if the reading failed """ return self.stub.GetValues(Empty()) @@ -193,22 +130,22 @@ def get_imu_advanced_values(self) -> IMUAdvancedValuesResponse: """Get the latest IMU advanced values. Returns: - IMUAdvancedValuesResponse is a dictionary containing processed measurements where: - - 'linear_acceleration_x/y/z' contain gravity-compensated acceleration in m/s² - - 'angular_velocity_x/y/z' contain filtered angular velocity in rad/s - - 'error' contains any error information if the processing failed + IMUAdvancedValuesResponse containing: + - linear_acceleration_x/y/z: Gravity-compensated acceleration in m/s² + - angular_velocity_x/y/z: Filtered angular velocity in rad/s + - error: Optional error information if the processing failed """ return self.stub.GetAdvancedValues(Empty()) - def get_euler_angles(self) -> EulerAnglesResponse: + def get_euler_angles(self) -> imu_pb2.EulerAnglesResponse: """Get the latest Euler angles. Returns: - EulerAnglesResponse is a dictionary containing orientation angles where: - - 'roll' contains rotation around X-axis in radians - - 'pitch' contains rotation around Y-axis in radians - - 'yaw' contains rotation around Z-axis in radians - - 'error' contains any error information if the calculation failed + EulerAnglesResponse containing: + - roll: Rotation around X-axis in radians + - pitch: Rotation around Y-axis in radians + - yaw: Rotation around Z-axis in radians + - error: Optional error information if the calculation failed """ return self.stub.GetEuler(Empty()) @@ -216,14 +153,14 @@ def get_quaternion(self) -> QuaternionResponse: """Get the latest quaternion. Returns: - QuaternionResponse is a dictionary containing orientation as quaternion where: - - 'w' contains the scalar component - - 'x/y/z' contain the vector components - - 'error' contains any error information if the calculation failed + QuaternionResponse containing: + - w: Scalar component + - x/y/z: Vector components + - error: Optional error information if the calculation failed """ return self.stub.GetQuaternion(Empty()) - def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> ActionResponse: + def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> common_pb2.ActionResponse: """Zero the IMU. Example: @@ -243,19 +180,18 @@ def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> Actio max_acceleration: Maximum acceleration during zeroing Returns: - ActionResponse is a dictionary where 'success' indicates if the zeroing operation - was successful, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the zeroing operation was successful """ request = imu_pb2.ZeroIMURequest(duration=_duration_from_seconds(duration), **kwargs) return self.stub.Zero(request) - def calibrate(self) -> CalibrateIMUResponse: + def calibrate(self) -> imu_pb2.CalibrateIMUResponse: """Calibrate the IMU. This starts a long-running calibration operation. The operation can be monitored using get_calibration_status(). Returns: - CalibrateIMUResponse: Response containing operation name and metadata about the calibration. + CalibrateIMUResponse containing operation name and metadata about the calibration. """ return self.stub.Calibrate(Empty()) diff --git a/kos-py/pykos/services/inference.py b/kos-py/pykos/services/inference.py index 1ee3323..c2a550e 100644 --- a/kos-py/pykos/services/inference.py +++ b/kos-py/pykos/services/inference.py @@ -5,12 +5,27 @@ import grpc from kos_protos import common_pb2, inference_pb2, inference_pb2_grpc +from kos_protos.inference_pb2 import ( + ForwardRequest, + ForwardResponse, + GetModelsInfoRequest, + GetModelsInfoResponse, + LoadModelsResponse, + ModelMetadata as ProtoModelMetadata, + ModelUids, + Tensor as ProtoTensor, + UploadModelResponse, +) class ModelMetadata(TypedDict): - """TypedDict containing model metadata for uploading models. + """Model metadata for uploading models. - Contains optional fields providing additional information about the model. + Fields: + model_name: Optional name of the model + model_description: Optional description of the model + model_version: Optional version of the model + model_author: Optional author of the model """ model_name: NotRequired[str | None] @@ -22,7 +37,7 @@ class ModelMetadata(TypedDict): class TensorDimension(TypedDict): """Information about a tensor dimension. - Args: + Fields: size: Size of this dimension name: Name of this dimension (e.g., "batch", "channels", "height") dynamic: Whether this dimension can vary (e.g., batch size) @@ -36,7 +51,7 @@ class TensorDimension(TypedDict): class Tensor(TypedDict): """A tensor containing data. - Args: + Fields: values: Tensor values in row-major order shape: List of dimension information """ @@ -45,204 +60,94 @@ class Tensor(TypedDict): shape: list[TensorDimension] -class ForwardResponse(TypedDict): - """Response from running model inference. - - Args: - outputs: Dictionary mapping tensor names to output tensors - error: Optional error information if inference failed - """ - - outputs: dict[str, Tensor] - error: NotRequired[common_pb2.Error | None] - - -class ModelInfo(TypedDict): - """Information about a model. - - Args: - uid: Model UID (assigned by server) - metadata: Model metadata - input_specs: Expected input tensor specifications - output_specs: Expected output tensor specifications - description: str - """ - - uid: str - metadata: ModelMetadata - input_specs: dict[str, Tensor] - output_specs: dict[str, Tensor] - description: str - - -class GetModelsInfoResponse(TypedDict): - """Response containing information about available models.""" - - models: list[ModelInfo] - error: NotRequired[common_pb2.Error | None] - - -class UploadModelResponse(TypedDict): - """Response from uploading a model.""" - - uid: str - error: NotRequired[common_pb2.Error | None] - - -class LoadModelsResponse(TypedDict): - """Response from loading models.""" - - success: bool - error: NotRequired[common_pb2.Error | None] - - -class ActionResponse(TypedDict): - """Response indicating success/failure of an action.""" - - success: bool - error: NotRequired[common_pb2.Error | None] - - class InferenceServiceClient: - """Client for the InferenceService. + """Client for the inference service. - This service allows uploading models and running inference on them. + This client provides methods to interact with the inference service for + uploading, loading, and running machine learning models. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the inference service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = inference_pb2_grpc.InferenceServiceStub(channel) def upload_model(self, model_data: bytes, metadata: ModelMetadata | None = None) -> UploadModelResponse: - """Upload a model to the robot. - - Example: - >>> client.upload_model(model_data, - ... metadata={"model_name": "MyModel", - ... "model_description": "A model for inference", - ... "model_version": "1.0.0", - ... "model_author": "John Doe"}) + """Upload a model to the inference service. Args: - model_data: The binary model data to upload. - metadata: Optional metadata about the model that can include: - model_name: Name of the model - model_description: Description of the model - model_version: Version of the model - model_author: Author of the model + model_data: The serialized model data + metadata: Optional metadata about the model Returns: - UploadModelResponse is a dictionary where 'uid' contains the unique identifier - assigned to the uploaded model, and 'error' contains any error information if - the upload failed. + UploadModelResponse containing: + - uid: Unique identifier assigned to the model + - error: Optional error information """ proto_metadata = None if metadata is not None: - proto_metadata = inference_pb2.ModelMetadata(**metadata) + proto_metadata = ProtoModelMetadata(**metadata) request = inference_pb2.UploadModelRequest(model=model_data, metadata=proto_metadata) return self.stub.UploadModel(request) def load_models(self, uids: list[str]) -> LoadModelsResponse: - """Load models from the robot's filesystem. + """Load models into memory. Args: - uids: List of model UIDs to load. + uids: List of model UIDs to load Returns: - LoadModelsResponse is a dictionary where 'success' indicates if all models - were loaded successfully, and 'error' contains any error information if the - loading failed. + LoadModelsResponse containing: + - models: List of loaded model information + - result: Success/failure status """ - request = inference_pb2.ModelUids(uids=uids) + request = ModelUids(uids=uids) return self.stub.LoadModels(request) - def unload_models(self, uids: list[str]) -> ActionResponse: - """Unload models from the robot's filesystem. + def unload_models(self, uids: list[str]) -> common_pb2.ActionResponse: + """Unload models from memory. Args: - uids: List of model UIDs to unload. + uids: List of model UIDs to unload Returns: - ActionResponse indicating success/failure of the unload operation. + ActionResponse indicating if the unload was successful """ - request = inference_pb2.ModelUids(uids=uids) + request = ModelUids(uids=uids) return self.stub.UnloadModels(request) def get_models_info(self, model_uids: list[str] | None = None) -> GetModelsInfoResponse: """Get information about available models. Args: - model_uids: Optional list of specific model UIDs to get info for. - If None, returns info for all models. + model_uids: Optional list of specific model UIDs to query. + If None, returns info for all available models. Returns: - GetModelsInfoResponse is a dictionary where 'models' contains a list of ModelInfo - objects with details about each model, and 'error' contains any error information - if the query failed. + GetModelsInfoResponse containing: + - models: List of ModelInfo objects with model details + - error: Optional error information """ if model_uids is not None: - request = inference_pb2.GetModelsInfoRequest(model_uids=inference_pb2.ModelUids(uids=model_uids)) + request = GetModelsInfoRequest(model_uids=ModelUids(uids=model_uids)) else: - request = inference_pb2.GetModelsInfoRequest(all=True) - - response = self.stub.GetModelsInfo(request) - - return GetModelsInfoResponse( - models=[ - ModelInfo( - uid=model.uid, - metadata=ModelMetadata( - model_name=model.metadata.model_name if model.metadata.HasField("model_name") else None, - model_description=( - model.metadata.model_description if model.metadata.HasField("model_description") else None - ), - model_version=( - model.metadata.model_version if model.metadata.HasField("model_version") else None - ), - model_author=model.metadata.model_author if model.metadata.HasField("model_author") else None, - ), - input_specs={ - name: Tensor( - values=list(tensor.values), - shape=[ - TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) - for dim in tensor.shape - ], - ) - for name, tensor in model.input_specs.items() - }, - output_specs={ - name: Tensor( - values=list(tensor.values), - shape=[ - TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) - for dim in tensor.shape - ], - ) - for name, tensor in model.output_specs.items() - }, - description=model.description, - ) - for model in response.models - ], - error=response.error if response.HasField("error") else None, - ) + request = GetModelsInfoRequest(all=True) + return self.stub.GetModelsInfo(request) def forward(self, model_uid: str, inputs: dict[str, Tensor]) -> ForwardResponse: - """Run inference using a specified model. + """Run inference on a model. Args: - model_uid: The UID of the model to use for inference. - inputs: Dictionary mapping tensor names to tensors. + model_uid: The UID of the model to use + inputs: Dictionary mapping input names to input tensors Returns: - ForwardResponse is a dictionary where 'outputs' contains a mapping of tensor - names to output tensors from the model inference, and 'error' contains any - error information if the inference failed. + ForwardResponse containing: + - outputs: Dictionary mapping tensor names to output tensors + - error: Optional error information """ tensor_inputs = {} for name, tensor in inputs.items(): @@ -250,18 +155,8 @@ def forward(self, model_uid: str, inputs: dict[str, Tensor]) -> ForwardResponse: inference_pb2.Tensor.Dimension(size=dim["size"], name=dim["name"], dynamic=dim["dynamic"]) for dim in tensor["shape"] ] - proto_tensor = inference_pb2.Tensor(values=tensor["values"], shape=shape) + proto_tensor = ProtoTensor(values=tensor["values"], shape=shape) tensor_inputs[name] = proto_tensor - response = self.stub.Forward(inference_pb2.ForwardRequest(model_uid=model_uid, inputs=tensor_inputs)) - - return ForwardResponse( - outputs={ - name: Tensor( - values=list(tensor.values), - shape=[TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) for dim in tensor.shape], - ) - for name, tensor in response.outputs.items() - }, - error=response.error if response.HasField("error") else None, - ) + request = ForwardRequest(model_uid=model_uid, inputs=tensor_inputs) + return self.stub.Forward(request) diff --git a/kos-py/pykos/services/led_matrix.py b/kos-py/pykos/services/led_matrix.py index 664fea9..a15ab77 100644 --- a/kos-py/pykos/services/led_matrix.py +++ b/kos-py/pykos/services/led_matrix.py @@ -5,7 +5,12 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, led_matrix_pb2, led_matrix_pb2_grpc +from kos_protos import common_pb2, led_matrix_pb2_grpc +from kos_protos.led_matrix_pb2 import ( + GetMatrixInfoResponse, + WriteBufferRequest, + WriteColorBufferRequest, +) class MatrixInfo(TypedDict): @@ -34,7 +39,7 @@ class MatrixInfo(TypedDict): class ImageData(TypedDict): """Image data to be written to the LED matrix. - Args: + Fields: buffer: Raw image data bytes width: Image width in pixels height: Image height in pixels @@ -57,51 +62,51 @@ class ActionResponse(TypedDict): class LEDMatrixServiceClient: - """Client for the LEDMatrixService. + """Client for the LED matrix service. - This service allows controlling an LED matrix display. + This client provides methods to interact with an LED matrix display, + including querying its capabilities and writing image data to it. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the LED matrix service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = led_matrix_pb2_grpc.LEDMatrixServiceStub(channel) - def get_matrix_info(self) -> MatrixInfo: - """Get information about the LED matrix including dimensions and capabilities. + def get_matrix_info(self) -> GetMatrixInfoResponse: + """Get information about the LED matrix display. Returns: - MatrixInfo is a dictionary containing LED matrix configuration where: - - 'width' contains the number of LEDs in horizontal dimension - - 'height' contains the number of LEDs in vertical dimension - - 'brightness_levels' contains the number of supported brightness levels - - 'color_capable' indicates whether the matrix supports color - - 'bits_per_pixel' contains the number of bits used per pixel - - 'error' contains any error information if the query failed + GetMatrixInfoResponse containing: + - width: Width in pixels + - height: Height in pixels + - brightness_levels: Number of brightness levels supported + - color_capable: Whether the matrix supports color + - bits_per_pixel: Number of bits used to represent each pixel + - error: Optional error information """ return self.stub.GetMatrixInfo(Empty()) - def write_buffer(self, buffer: bytes) -> ActionResponse: - """Write binary on/off states to the LED matrix. + def write_buffer(self, buffer: bytes) -> common_pb2.ActionResponse: + """Write raw buffer data to the LED matrix. - The buffer should be width * height / 8 bytes long, where each bit - represents one LED's on/off state. + This method is for writing pre-formatted data that matches the matrix's + native format. For writing color images, use write_color_buffer instead. Args: - buffer: Binary buffer containing LED states + buffer: Raw buffer data bytes in the matrix's native format Returns: - ActionResponse is a dictionary where 'success' indicates if the write operation - was successful, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the write was successful """ - request = led_matrix_pb2.WriteBufferRequest(buffer=buffer) + request = WriteBufferRequest(buffer=buffer) return self.stub.WriteBuffer(request) - def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> ActionResponse: - """Write image data to the LED matrix. + def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> common_pb2.ActionResponse: + """Write a color image buffer to the LED matrix. Args: **kwargs: Image data containing: @@ -112,8 +117,7 @@ def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> ActionResponse: brightness: Global brightness level (0-255) Returns: - ActionResponse is a dictionary where 'success' indicates if the write operation - was successful, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the write was successful """ - request = led_matrix_pb2.WriteColorBufferRequest(**kwargs) + request = WriteColorBufferRequest(**kwargs) return self.stub.WriteColorBuffer(request) diff --git a/kos-py/pykos/services/process_manager.py b/kos-py/pykos/services/process_manager.py index 95a12bf..0ce11a7 100644 --- a/kos-py/pykos/services/process_manager.py +++ b/kos-py/pykos/services/process_manager.py @@ -1,74 +1,50 @@ """Process manager service client.""" -from typing import NotRequired, TypedDict - import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, process_manager_pb2_grpc -from kos_protos.process_manager_pb2 import KClipStartRequest - - -class KClipStartResponse(TypedDict): - """TypedDict containing response from starting a KClip recording. - - A dictionary type containing information about a newly started - KClip recording session, including its unique identifier. - - Fields: - success: Whether the recording started successfully - clip_uuid: Unique identifier for the recording session - error: Optional error information if start failed - """ - - success: bool - clip_uuid: str - error: NotRequired[common_pb2.Error | None] - +from kos_protos import process_manager_pb2_grpc +from kos_protos.process_manager_pb2 import ( + KClipStartRequest, + KClipStartResponse, + KClipStopResponse, +) -class KClipStopResponse(TypedDict): - """TypedDict containing response from stopping a KClip recording. - A dictionary type containing information about the stopped - KClip recording session, including its identifier. +class ProcessManagerServiceClient: + """Client for the process manager service. - Fields: - success: Whether the recording stopped successfully - clip_uuid: Identifier of the stopped recording session - error: Optional error information if stop failed + This client provides methods to manage KClip recordings and other processes. """ - success: bool - clip_uuid: str - error: NotRequired[common_pb2.Error | None] - - -class ProcessManagerServiceClient: def __init__(self, channel: grpc.Channel) -> None: + """Initialize the process manager service client. + + Args: + channel: gRPC channel for communication with the service + """ self.stub = process_manager_pb2_grpc.ProcessManagerServiceStub(channel) def start_kclip(self, action: str) -> KClipStartResponse: - """Start KClip recording. + """Start a new KClip recording. Args: - action: The action string for the KClip request + action: The action being recorded Returns: - KClipStartResponse is a dictionary where: - - 'success' indicates if the recording started successfully - - 'clip_uuid' contains the unique identifier for the recording session - - 'error' contains any error information if the start failed + KClipStartResponse containing: + - clip_uuid: Unique identifier for the recording session + - error: Optional error information if start failed """ request = KClipStartRequest(action=action) return self.stub.StartKClip(request) def stop_kclip(self, request: Empty = Empty()) -> KClipStopResponse: - """Stop KClip recording. + """Stop the current KClip recording. Returns: - KClipStopResponse is a dictionary where: - - 'success' indicates if the recording stopped successfully - - 'clip_uuid' contains the identifier of the stopped recording session - - 'error' contains any error information if the stop failed + KClipStopResponse containing: + - clip_uuid: Identifier of the stopped recording session + - error: Optional error information if stop failed """ return self.stub.StopKClip(request) diff --git a/kos-py/pykos/services/sim.py b/kos-py/pykos/services/sim.py index 9aa155f..89e3920 100644 --- a/kos-py/pykos/services/sim.py +++ b/kos-py/pykos/services/sim.py @@ -5,14 +5,20 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, sim_pb2, sim_pb2_grpc +from kos_protos import common_pb2, sim_pb2_grpc +from kos_protos.sim_pb2 import ( + DefaultPosition, + GetParametersResponse, + ResetRequest, + SetParametersRequest, + SetPausedRequest, + SimulationParameters, + StepRequest, +) -class DefaultPosition(TypedDict): - """TypedDict containing initial simulation state. - - A dictionary type specifying the initial joint positions for - resetting the simulation to a known state. +class DefaultPositionInput(TypedDict): + """Initial simulation state. Fields: qpos: List of joint positions in simulation units @@ -21,64 +27,61 @@ class DefaultPosition(TypedDict): qpos: list[float] -class ResetRequest(TypedDict): - """TypedDict containing simulation reset parameters. - - A dictionary type specifying how the simulation should be reset, - including optional initial state and randomization settings. +class ResetParams(TypedDict): + """Simulation reset parameters. Fields: initial_state: Optional DefaultPosition to set initial joint positions randomize: Optional flag to add randomization during reset """ - initial_state: NotRequired[DefaultPosition] + initial_state: NotRequired[DefaultPositionInput] randomize: NotRequired[bool] -class StepRequest(TypedDict): - """Request parameters for stepping simulation.""" +class StepParams(TypedDict): + """Parameters for stepping simulation. + + Fields: + num_steps: Number of simulation steps to take + step_size: Optional duration of each step in seconds + """ num_steps: int step_size: NotRequired[float] -class SimulationParameters(TypedDict): - """Parameters for configuring simulation.""" +class SimulationParams(TypedDict): + """Parameters for configuring simulation. + + Fields: + time_scale: Optional simulation time scale factor + gravity: Optional gravitational acceleration in m/s² + initial_state: Optional default joint positions + """ time_scale: NotRequired[float] gravity: NotRequired[float] - initial_state: NotRequired[DefaultPosition] - - -class ActionResponse(TypedDict): - """Response indicating success/failure of an action.""" - - success: bool - error: NotRequired[common_pb2.Error | None] - + initial_state: NotRequired[DefaultPositionInput] -class GetParametersResponse(TypedDict): - """Response containing current simulation parameters.""" - time_scale: float - gravity: float - initial_state: DefaultPosition - error: NotRequired[common_pb2.Error | None] +class SimServiceClient: + """Client for the simulation service. + This client provides methods to control and configure the physics simulation, + including resetting, stepping, pausing, and parameter adjustment. + """ -class SimServiceClient: def __init__(self, channel: grpc.Channel) -> None: - self.stub = sim_pb2_grpc.SimulationServiceStub(channel) + """Initialize the simulation service client. - def reset(self, **kwargs: Unpack[ResetRequest]) -> ActionResponse: - """Reset the simulation to its initial state. + Args: + channel: gRPC channel for communication with the service + """ + self.stub = sim_pb2_grpc.SimulationServiceStub(channel) - Example: - >>> reset( - ... initial_state={"qpos": [0.0, 0.0, 0.0]}, - ... randomize=True - ... ) + def reset(self, **kwargs: Unpack[ResetParams]) -> common_pb2.ActionResponse: + """Reset the simulation to a known state. Args: **kwargs: Reset parameters that may include: @@ -86,94 +89,72 @@ def reset(self, **kwargs: Unpack[ResetRequest]) -> ActionResponse: randomize: Whether to randomize the initial state Returns: - ActionResponse is a dictionary where 'success' indicates if the reset operation - was successful, and 'error' contains any error information if the reset failed. + ActionResponse indicating if the reset was successful """ initial_state = None if "initial_state" in kwargs: pos = kwargs["initial_state"] - initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"]) + initial_state = DefaultPosition(qpos=pos["qpos"]) - request = sim_pb2.ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize")) + request = ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize")) return self.stub.Reset(request) - def set_paused(self, paused: bool) -> ActionResponse: + def set_paused(self, paused: bool) -> common_pb2.ActionResponse: """Pause or unpause the simulation. - Example: - >>> set_paused(True) # Pause simulation - >>> set_paused(False) # Resume simulation - Args: paused: True to pause, False to unpause Returns: - ActionResponse is a dictionary where 'success' indicates if the pause state - was set successfully, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the pause state was set successfully """ - request = sim_pb2.SetPausedRequest(paused=paused) + request = SetPausedRequest(paused=paused) return self.stub.SetPaused(request) - def step(self, num_steps: int, step_size: float | None = None) -> ActionResponse: - """Step the simulation forward. - - Example: - >>> step(num_steps=100, step_size=0.001) # Step forward 100 times with 1ms steps - >>> step(num_steps=50) # Step forward 50 times with default step size + def step(self, num_steps: int, step_size: float | None = None) -> common_pb2.ActionResponse: + """Step the simulation forward by a specified number of steps. Args: num_steps: Number of simulation steps to take - step_size: Optional time per step in seconds + step_size: Optional duration of each step in seconds Returns: - ActionResponse is a dictionary where 'success' indicates if the stepping operation - was successful, and 'error' contains any error information if the stepping failed. + ActionResponse indicating if the stepping was successful """ - request = sim_pb2.StepRequest(num_steps=num_steps, step_size=step_size) + request = StepRequest(num_steps=num_steps, step_size=step_size) return self.stub.Step(request) - def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> ActionResponse: + def set_parameters(self, **kwargs: Unpack[SimulationParams]) -> common_pb2.ActionResponse: """Set simulation parameters. - Example: - >>> set_parameters( - ... time_scale=1.0, - ... gravity=9.81, - ... initial_state={"qpos": [0.0, 0.0, 0.0]} - ... ) - Args: **kwargs: Parameters that may include: - time_scale: Simulation time scale - gravity: Gravity constant - initial_state: Default position state + time_scale: Simulation time scale factor + gravity: Gravitational acceleration in m/s² + initial_state: Default joint positions Returns: - ActionResponse is a dictionary where 'success' indicates if the parameters were - set successfully, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the parameters were set successfully """ initial_state = None if "initial_state" in kwargs: pos = kwargs["initial_state"] - initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"]) + initial_state = DefaultPosition(qpos=pos["qpos"]) - params = sim_pb2.SimulationParameters( - time_scale=kwargs.get("time_scale"), gravity=kwargs.get("gravity"), initial_state=initial_state + params = SimulationParameters( + time_scale=kwargs.get("time_scale"), + gravity=kwargs.get("gravity"), + initial_state=initial_state, ) - request = sim_pb2.SetParametersRequest(parameters=params) + request = SetParametersRequest(parameters=params) return self.stub.SetParameters(request) def get_parameters(self) -> GetParametersResponse: """Get current simulation parameters. - Example: - >>> get_parameters() - Returns: - GetParametersResponse is a dictionary where: - - 'time_scale' contains the current simulation time scaling factor - - 'gravity' contains the current gravity constant value - - 'initial_state' contains the default position state as a DefaultPosition dictionary - - 'error' contains any error information if the query failed + GetParametersResponse containing: + - parameters: Current simulation parameters + - error: Optional error information """ return self.stub.GetParameters(Empty()) diff --git a/kos-py/pykos/services/sound.py b/kos-py/pykos/services/sound.py index 97e571b..e740d1b 100644 --- a/kos-py/pykos/services/sound.py +++ b/kos-py/pykos/services/sound.py @@ -1,11 +1,17 @@ """Sound service client.""" -from typing import Generator, Iterator, NotRequired, TypedDict, Unpack +from typing import Generator, Iterator, NotRequired, TypedDict import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, sound_pb2, sound_pb2_grpc +from kos_protos import common_pb2, sound_pb2_grpc +from kos_protos.sound_pb2 import ( + AudioConfig as ProtoAudioConfig, + GetAudioInfoResponse, + PlayAudioRequest, + RecordAudioRequest, +) class AudioCapability(TypedDict): @@ -44,7 +50,7 @@ class AudioInfo(TypedDict): class AudioConfig(TypedDict): """Audio configuration parameters. - Args: + Fields: sample_rate: Sample rate in Hz (e.g., 44100) bit_depth: Bit depth (e.g., 16) channels: Number of channels (1 for mono, 2 for stereo) @@ -62,107 +68,101 @@ class ActionResponse(TypedDict): error: NotRequired[common_pb2.Error | None] -class RecordAudioResponse(TypedDict): - """Response containing recorded audio data.""" - - audio_data: bytes - error: NotRequired[common_pb2.Error | None] - - class SoundServiceClient: - """Client for the SoundService. + """Client for the sound service. - This service allows playing audio through speakers and recording from microphones. + This client provides methods to interact with the audio system, + including playback and recording capabilities. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the sound service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = sound_pb2_grpc.SoundServiceStub(channel) - def get_audio_info(self) -> AudioInfo: - """Get information about audio capabilities. + def get_audio_info(self) -> GetAudioInfoResponse: + """Get information about audio system capabilities. Returns: - AudioInfo is a dictionary where: - - 'playback' contains an AudioCapability dictionary describing playback capabilities: - - 'recording' contains an AudioCapability dictionary describing recording capabilities - - 'error' contains any error information if the query failed + GetAudioInfoResponse containing: + - playback: Playback capabilities (sample rates, bit depths, channels) + - recording: Recording capabilities (sample rates, bit depths, channels) + - error: Optional error information """ return self.stub.GetAudioInfo(Empty()) - def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConfig]) -> ActionResponse: - """Stream PCM audio data to the speaker. - - Example: - >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=2) - >>> with open('audio.raw', 'rb') as f: - ... def chunks(): - ... while chunk := f.read(4096): - ... yield chunk - ... response = client.play_audio(chunks(), **config) + def play_audio( + self, + audio_iterator: Iterator[bytes], + sample_rate: int, + bit_depth: int, + channels: int, + ) -> common_pb2.ActionResponse: + """Play audio data through the audio system. Args: - audio_iterator: Iterator yielding chunks of PCM audio data - **kwargs: Audio configuration parameters: - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) + audio_iterator: Iterator yielding audio data chunks + sample_rate: Sample rate in Hz (e.g., 44100) + bit_depth: Bit depth (e.g., 16) + channels: Number of channels (1 for mono, 2 for stereo) Returns: - ActionResponse is a dictionary where 'success' indicates if the playback operation - was successful, and 'error' contains any error information if the operation failed. + ActionResponse indicating if the playback was successful """ - def request_iterator() -> Generator[sound_pb2.PlayAudioRequest, None, None]: + def request_iterator() -> Generator[PlayAudioRequest, None, None]: # First message includes config - yield sound_pb2.PlayAudioRequest( - config=sound_pb2.AudioConfig(**kwargs), + yield PlayAudioRequest( + config=ProtoAudioConfig( + sample_rate=sample_rate, + bit_depth=bit_depth, + channels=channels, + ) ) # Subsequent messages contain audio data for chunk in audio_iterator: - yield sound_pb2.PlayAudioRequest(audio_data=chunk) + yield PlayAudioRequest(audio_data=chunk) return self.stub.PlayAudio(request_iterator()) - def record_audio(self, duration_ms: int = 0, **kwargs: Unpack[AudioConfig]) -> Generator[bytes, None, None]: - """Record PCM audio data from the microphone. - - Example: - >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=1) - >>> with open('recording.raw', 'wb') as f: - ... for chunk in client.record_audio(duration_ms=5000, **config): - ... f.write(chunk) + def record_audio( + self, + duration_ms: int = 0, + sample_rate: int = 44100, + bit_depth: int = 16, + channels: int = 1, + ) -> Generator[bytes, None, None]: + """Record audio from the audio system. Args: - duration_ms: Recording duration in milliseconds (0 for continuous) - **kwargs: Audio configuration parameters: - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) - - Yields: - Chunks of PCM audio data as bytes. If an error occurs during recording, - a RuntimeError will be raised with the error details. + duration_ms: Duration to record in milliseconds (0 for continuous) + sample_rate: Sample rate in Hz (default: 44100) + bit_depth: Bit depth (default: 16) + channels: Number of channels (default: 1 for mono) + + Returns: + Generator yielding recorded audio data chunks """ - request = sound_pb2.RecordAudioRequest( - config=sound_pb2.AudioConfig(**kwargs), + request = RecordAudioRequest( + config=ProtoAudioConfig( + sample_rate=sample_rate, + bit_depth=bit_depth, + channels=channels, + ), duration_ms=duration_ms, ) - for response in self.stub.RecordAudio(request): if response.HasField("error"): raise RuntimeError(f"Recording error: {response.error}") yield response.audio_data - def stop_recording(self) -> ActionResponse: - """Stop an ongoing recording session. + def stop_recording(self) -> common_pb2.ActionResponse: + """Stop the current audio recording. Returns: - ActionResponse is a dictionary where 'success' indicates if the recording was - stopped successfully, and 'error' contains any error information if the stop failed. + ActionResponse indicating if the recording was stopped successfully """ return self.stub.StopRecording(Empty()) diff --git a/kos-py/tests/test_pykos.py b/kos-py/tests/test_pykos.py index a69773f..b8c6e70 100644 --- a/kos-py/tests/test_pykos.py +++ b/kos-py/tests/test_pykos.py @@ -4,7 +4,8 @@ import pytest import pykos -from pykos.services.actuator import ActionResult, GetActuatorsStateResponse +from kos_protos.common_pb2 import ActionResponse +from pykos.services.actuator import GetActuatorsStateResponse from pykos.services.imu import IMUValuesResponse from pykos.services.process_manager import KClipStartResponse, KClipStopResponse @@ -19,7 +20,7 @@ def test_pykos() -> None: client = pykos.KOS("127.0.0.1") # Tests configuring the actuator. - actuator_response: ActionResult = client.actuator.configure_actuator(actuator_id=1) + actuator_response: ActionResponse = client.actuator.configure_actuator(actuator_id=1) assert actuator_response["success"] # Tests getting the actuator state.