diff --git a/kos-py/pykos/services/actuator.py b/kos-py/pykos/services/actuator.py index c079656..8f45d87 100644 --- a/kos-py/pykos/services/actuator.py +++ b/kos-py/pykos/services/actuator.py @@ -7,17 +7,48 @@ from google.protobuf.any_pb2 import Any as AnyPb2 from kos_protos import actuator_pb2, actuator_pb2_grpc, common_pb2 -from kos_protos.actuator_pb2 import CalibrateActuatorMetadata +from kos_protos.actuator_pb2 import ( + CalibrateActuatorMetadata, + CommandActuatorsRequest, + CommandActuatorsResponse, + ConfigureActuatorRequest, + GetActuatorsStateRequest, + GetActuatorsStateResponse, +) class ActuatorCommand(TypedDict): + """Command parameters for an actuator. + + Fields: + actuator_id: The ID of the actuator to command + position: Optional target position in degrees + velocity: Optional target velocity in degrees/second + torque: Optional target torque in Nm + """ + actuator_id: int position: NotRequired[float] velocity: NotRequired[float] torque: NotRequired[float] -class ConfigureActuatorRequest(TypedDict): +class ConfigureActuatorParams(TypedDict): + """Configuration parameters for an actuator. + + Fields: + actuator_id: The ID of the actuator to configure + kp: Optional proportional gain for position control + kd: Optional derivative gain for position control + ki: Optional integral gain for position control + max_torque: Optional maximum torque limit + protective_torque: Optional protective torque threshold + protection_time: Optional protection activation time + torque_enabled: Optional flag to enable/disable torque + new_actuator_id: Optional new ID to assign to the actuator + zero_position: Optional flag to set current position as zero + """ + actuator_id: int kp: NotRequired[float] kd: NotRequired[float] @@ -30,10 +61,6 @@ class ConfigureActuatorRequest(TypedDict): zero_position: NotRequired[bool] -class ActuatorStateRequest(TypedDict): - actuator_ids: list[int] - - class CalibrationStatus: Calibrating = "calibrating" Calibrated = "calibrated" @@ -50,8 +77,10 @@ def decode_metadata(self, metadata_any: AnyPb2) -> None: metadata = CalibrateActuatorMetadata() if metadata_any.Is(CalibrateActuatorMetadata.DESCRIPTOR): metadata_any.Unpack(metadata) - self.actuator_id = metadata.actuator_id - self.status = metadata.status if metadata.HasField("status") else None + if metadata.HasField("actuator_id"): + self.actuator_id = metadata.actuator_id + if metadata.HasField("status"): + self.status = metadata.status def __str__(self) -> str: return f"CalibrationMetadata(actuator_id={self.actuator_id}, status={self.status})" @@ -68,88 +97,91 @@ def __init__(self, channel: grpc.Channel) -> None: def calibrate(self, actuator_id: int) -> CalibrationMetadata: """Calibrate an actuator. + Args: + actuator_id: The ID of the actuator to calibrate + Returns: - Operation: The operation for the calibration. + CalibrationMetadata object containing the actuator ID and calibration status """ - response = self.stub.CalibrateActuator(actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id)) - metadata = CalibrationMetadata(response.metadata) - return metadata + request = actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id) + operation = self.stub.CalibrateActuator(request) + return CalibrationMetadata(operation.metadata) def get_calibration_status(self, actuator_id: int) -> str | None: - response = self.operations_stub.GetOperation( - operations_pb2.GetOperationRequest(name=f"operations/calibrate_actuator/{actuator_id}") - ) - metadata = CalibrationMetadata(response.metadata) - return metadata.status + """Get the calibration status of an actuator. - def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.CommandActuatorsResponse: - """Command multiple actuators at once. + Args: + actuator_id: The ID of the actuator to check - Example: - >>> command_actuators([ - ... {"actuator_id": 1, "position": 90.0, "velocity": 100.0, "torque": 1.0}, - ... {"actuator_id": 2, "position": 180.0}, - ... ]) + Returns: + The calibration status string or None if not found + """ + request = operations_pb2.GetOperationRequest(name=str(actuator_id)) + operation = self.operations_stub.GetOperation(request) + return CalibrationMetadata(operation.metadata).status if operation.metadata else None + + def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuatorsResponse: + """Send commands to multiple actuators. Args: - commands: List of dictionaries containing actuator commands. - Each dict should have 'actuator_id' and optionally 'position', - 'velocity', and 'torque'. + commands: List of dictionaries specifying commands for each actuator. + Each dictionary must have 'actuator_id' and may include: + - position: Target position in degrees + - velocity: Target velocity in degrees/second + - torque: Target torque in Nm Returns: - List of ActionResult objects indicating success/failure for each command. + CommandActuatorsResponse containing results for each command """ - actuator_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] - request = actuator_pb2.CommandActuatorsRequest(commands=actuator_commands) + proto_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands] + request = CommandActuatorsRequest(commands=proto_commands) return self.stub.CommandActuators(request) - def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> common_pb2.ActionResult: - """Configure an actuator's parameters. + def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorParams]) -> common_pb2.ActionResponse: + """Configure an actuator with the specified parameters. Example: >>> configure_actuator( ... actuator_id=1, - ... kp=1.0, - ... kd=0.1, - ... ki=0.01, - ... max_torque=100.0, - ... protective_torque=None, - ... protection_time=None, + ... kp=10.0, + ... kd=1.0, + ... ki=0.1, + ... max_torque=2.0, + ... protective_torque=1.5, + ... protection_time=0.5, ... torque_enabled=True, - ... new_actuator_id=None, + ... new_actuator_id=2, ... zero_position=True ... ) - >>> configure_actuator( - ... actuator_id=2, - ... kp=1.0, - ... kd=0.1, - ... torque_enabled=True, - ... ) - Args: - actuator_id: ID of the actuator to configure - **kwargs: Configuration parameters that may include: - kp, kd, ki, max_torque, protective_torque, - protection_time, torque_enabled, new_actuator_id + **kwargs: Configuration parameters that must include: + actuator_id: The ID of the actuator to configure + And may optionally include: + kp: Proportional gain for position control + kd: Derivative gain for position control + ki: Integral gain for position control + max_torque: Maximum torque limit + protective_torque: Protective torque threshold + protection_time: Protection activation time + torque_enabled: Flag to enable/disable torque + new_actuator_id: New ID to assign to the actuator + zero_position: Flag to set current position as zero Returns: - ActionResponse indicating success/failure + ActionResponse indicating if the configuration was successful """ - request = actuator_pb2.ConfigureActuatorRequest(**kwargs) + request = ConfigureActuatorRequest(**kwargs) return self.stub.ConfigureActuator(request) - def get_actuators_state(self, actuator_ids: list[int] | None = None) -> actuator_pb2.GetActuatorsStateResponse: - """Get the state of multiple actuators. - - Example: - >>> get_actuators_state([1, 2]) + def get_actuators_state(self, actuator_ids: list[int] | None = None) -> GetActuatorsStateResponse: + """Get the current state of specified actuators. Args: - actuator_ids: List of actuator IDs to query. If None, gets state of all actuators. + actuator_ids: Optional list of actuator IDs to query. If None, queries all actuators. Returns: - List of ActuatorStateResponse objects containing the state information + GetActuatorsStateResponse containing states for each queried actuator """ - request = actuator_pb2.GetActuatorsStateRequest(actuator_ids=actuator_ids or []) + request = GetActuatorsStateRequest(actuator_ids=actuator_ids or []) return self.stub.GetActuatorsState(request) diff --git a/kos-py/pykos/services/imu.py b/kos-py/pykos/services/imu.py index 4eb85e6..613e89e 100644 --- a/kos-py/pykos/services/imu.py +++ b/kos-py/pykos/services/imu.py @@ -9,10 +9,68 @@ from google.protobuf.empty_pb2 import Empty from kos_protos import common_pb2, imu_pb2, imu_pb2_grpc -from kos_protos.imu_pb2 import CalibrateIMUMetadata +from kos_protos.imu_pb2 import ( + CalibrateIMUMetadata, + IMUAdvancedValuesResponse, + IMUValuesResponse, + QuaternionResponse, +) + + +class EulerAnglesResponse(TypedDict): + """TypedDict containing orientation in Euler angles. + + A dictionary type containing the IMU's orientation expressed as Euler angles + in the roll-pitch-yaw convention. + + Fields: + roll: Rotation around X-axis in radians + pitch: Rotation around Y-axis in radians + yaw: Rotation around Z-axis in radians + error: Optional error information if the calculation failed + """ + + roll: float + pitch: float + yaw: float + error: NotRequired[common_pb2.Error | None] + + +class ZeroIMUResponse(TypedDict): + """TypedDict containing response from IMU zeroing operation. + + A dictionary type indicating whether the IMU zeroing operation succeeded. + Zeroing sets the current orientation as the reference orientation. + + Fields: + success: Whether the zeroing operation completed successfully + error: Optional error information if the operation failed + """ + + success: bool + error: NotRequired[common_pb2.Error | None] + + +class ActionResponse(TypedDict): + success: bool + error: NotRequired[common_pb2.Error] + + +class CalibrateIMUResponse(TypedDict): + name: str + metadata: AnyPb2 class ZeroIMURequest(TypedDict): + """Parameters for zeroing the IMU. + + Fields: + max_retries: Optional maximum number of retries + max_angular_error: Optional maximum angular error during zeroing + max_velocity: Optional maximum velocity during zeroing + max_acceleration: Optional maximum acceleration during zeroing + """ + max_retries: NotRequired[int] max_angular_error: NotRequired[float] max_velocity: NotRequired[float] @@ -56,19 +114,26 @@ def __init__(self, channel: grpc.Channel) -> None: self.stub = imu_pb2_grpc.IMUServiceStub(channel) self.operations_stub = operations_pb2_grpc.OperationsStub(channel) - def get_imu_values(self) -> imu_pb2.IMUValuesResponse: + def get_imu_values(self) -> IMUValuesResponse: """Get the latest IMU sensor values. Returns: - ImuValuesResponse: The latest IMU sensor values. + IMUValuesResponse containing: + - accel_x/y/z: Acceleration values in m/s² + - gyro_x/y/z: Angular velocity values in rad/s + - mag_x/y/z: Optional magnetic field measurements + - error: Optional error information if the reading failed """ return self.stub.GetValues(Empty()) - def get_imu_advanced_values(self) -> imu_pb2.IMUAdvancedValuesResponse: + def get_imu_advanced_values(self) -> IMUAdvancedValuesResponse: """Get the latest IMU advanced values. Returns: - ImuAdvancedValuesResponse: The latest IMU advanced values. + IMUAdvancedValuesResponse containing: + - linear_acceleration_x/y/z: Gravity-compensated acceleration in m/s² + - angular_velocity_x/y/z: Filtered angular velocity in rad/s + - error: Optional error information if the processing failed """ return self.stub.GetAdvancedValues(Empty()) @@ -76,15 +141,22 @@ def get_euler_angles(self) -> imu_pb2.EulerAnglesResponse: """Get the latest Euler angles. Returns: - EulerAnglesResponse: The latest Euler angles. + EulerAnglesResponse containing: + - roll: Rotation around X-axis in radians + - pitch: Rotation around Y-axis in radians + - yaw: Rotation around Z-axis in radians + - error: Optional error information if the calculation failed """ return self.stub.GetEuler(Empty()) - def get_quaternion(self) -> imu_pb2.QuaternionResponse: + def get_quaternion(self) -> QuaternionResponse: """Get the latest quaternion. Returns: - QuaternionResponse: The latest quaternion. + QuaternionResponse containing: + - w: Scalar component + - x/y/z: Vector components + - error: Optional error information if the calculation failed """ return self.stub.GetQuaternion(Empty()) @@ -108,7 +180,7 @@ def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> commo max_acceleration: Maximum acceleration during zeroing Returns: - ActionResponse: The response from the zero operation. + ActionResponse indicating if the zeroing operation was successful """ request = imu_pb2.ZeroIMURequest(duration=_duration_from_seconds(duration), **kwargs) return self.stub.Zero(request) @@ -120,6 +192,6 @@ def calibrate(self) -> imu_pb2.CalibrateIMUResponse: using get_calibration_status(). Returns: - CalibrationMetadata: Metadata about the calibration operation. + CalibrateIMUResponse containing operation name and metadata about the calibration. """ return self.stub.Calibrate(Empty()) diff --git a/kos-py/pykos/services/inference.py b/kos-py/pykos/services/inference.py index b36bb1f..c2a550e 100644 --- a/kos-py/pykos/services/inference.py +++ b/kos-py/pykos/services/inference.py @@ -5,12 +5,27 @@ import grpc from kos_protos import common_pb2, inference_pb2, inference_pb2_grpc +from kos_protos.inference_pb2 import ( + ForwardRequest, + ForwardResponse, + GetModelsInfoRequest, + GetModelsInfoResponse, + LoadModelsResponse, + ModelMetadata as ProtoModelMetadata, + ModelUids, + Tensor as ProtoTensor, + UploadModelResponse, +) class ModelMetadata(TypedDict): """Model metadata for uploading models. - All fields are optional and can be used to provide additional information about the model. + Fields: + model_name: Optional name of the model + model_description: Optional description of the model + model_version: Optional version of the model + model_author: Optional author of the model """ model_name: NotRequired[str | None] @@ -22,7 +37,7 @@ class ModelMetadata(TypedDict): class TensorDimension(TypedDict): """Information about a tensor dimension. - Args: + Fields: size: Size of this dimension name: Name of this dimension (e.g., "batch", "channels", "height") dynamic: Whether this dimension can vary (e.g., batch size) @@ -36,7 +51,7 @@ class TensorDimension(TypedDict): class Tensor(TypedDict): """A tensor containing data. - Args: + Fields: values: Tensor values in row-major order shape: List of dimension information """ @@ -45,181 +60,94 @@ class Tensor(TypedDict): shape: list[TensorDimension] -class ForwardResponse(TypedDict): - """Response from running model inference. - - Args: - outputs: Dictionary mapping tensor names to output tensors - error: Optional error information if inference failed - """ - - outputs: dict[str, Tensor] - error: NotRequired[common_pb2.Error | None] - - -class ModelInfo(TypedDict): - """Information about a model. - - Args: - uid: Model UID (assigned by server) - metadata: Model metadata - input_specs: Expected input tensor specifications - output_specs: Expected output tensor specifications - description: str - """ - - uid: str - metadata: ModelMetadata - input_specs: dict[str, Tensor] - output_specs: dict[str, Tensor] - description: str - - -class GetModelsInfoResponse(TypedDict): - """Response containing information about available models.""" - - models: list[ModelInfo] - error: NotRequired[common_pb2.Error | None] - - class InferenceServiceClient: - """Client for the InferenceService. + """Client for the inference service. - This service allows uploading models and running inference on them. + This client provides methods to interact with the inference service for + uploading, loading, and running machine learning models. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the inference service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = inference_pb2_grpc.InferenceServiceStub(channel) - def upload_model( - self, model_data: bytes, metadata: ModelMetadata | None = None - ) -> inference_pb2.UploadModelResponse: - """Upload a model to the robot. - - Example: - >>> client.upload_model(model_data, - ... metadata={"model_name": "MyModel", - ... "model_description": "A model for inference", - ... "model_version": "1.0.0", - ... "model_author": "John Doe"}) + def upload_model(self, model_data: bytes, metadata: ModelMetadata | None = None) -> UploadModelResponse: + """Upload a model to the inference service. Args: - model_data: The binary model data to upload. - metadata: Optional metadata about the model that can include: - model_name: Name of the model - model_description: Description of the model - model_version: Version of the model - model_author: Author of the model + model_data: The serialized model data + metadata: Optional metadata about the model Returns: - UploadModelResponse containing the model UID and any error information. + UploadModelResponse containing: + - uid: Unique identifier assigned to the model + - error: Optional error information """ proto_metadata = None if metadata is not None: - proto_metadata = inference_pb2.ModelMetadata(**metadata) + proto_metadata = ProtoModelMetadata(**metadata) request = inference_pb2.UploadModelRequest(model=model_data, metadata=proto_metadata) return self.stub.UploadModel(request) - def load_models(self, uids: list[str]) -> inference_pb2.LoadModelsResponse: - """Load models from the robot's filesystem. + def load_models(self, uids: list[str]) -> LoadModelsResponse: + """Load models into memory. Args: - uids: List of model UIDs to load. + uids: List of model UIDs to load Returns: - LoadModelsResponse containing information about the loaded models. + LoadModelsResponse containing: + - models: List of loaded model information + - result: Success/failure status """ - request = inference_pb2.ModelUids(uids=uids) + request = ModelUids(uids=uids) return self.stub.LoadModels(request) def unload_models(self, uids: list[str]) -> common_pb2.ActionResponse: - """Unload models from the robot's filesystem. + """Unload models from memory. Args: - uids: List of model UIDs to unload. + uids: List of model UIDs to unload Returns: - ActionResponse indicating success/failure of the unload operation. + ActionResponse indicating if the unload was successful """ - request = inference_pb2.ModelUids(uids=uids) + request = ModelUids(uids=uids) return self.stub.UnloadModels(request) def get_models_info(self, model_uids: list[str] | None = None) -> GetModelsInfoResponse: """Get information about available models. Args: - model_uids: Optional list of specific model UIDs to get info for. - If None, returns info for all models. + model_uids: Optional list of specific model UIDs to query. + If None, returns info for all available models. Returns: GetModelsInfoResponse containing: - models: List of ModelInfo objects - error: Optional error information if fetching failed + - models: List of ModelInfo objects with model details + - error: Optional error information """ if model_uids is not None: - request = inference_pb2.GetModelsInfoRequest(model_uids=inference_pb2.ModelUids(uids=model_uids)) + request = GetModelsInfoRequest(model_uids=ModelUids(uids=model_uids)) else: - request = inference_pb2.GetModelsInfoRequest(all=True) - - response = self.stub.GetModelsInfo(request) - - return GetModelsInfoResponse( - models=[ - ModelInfo( - uid=model.uid, - metadata=ModelMetadata( - model_name=model.metadata.model_name if model.metadata.HasField("model_name") else None, - model_description=( - model.metadata.model_description if model.metadata.HasField("model_description") else None - ), - model_version=( - model.metadata.model_version if model.metadata.HasField("model_version") else None - ), - model_author=model.metadata.model_author if model.metadata.HasField("model_author") else None, - ), - input_specs={ - name: Tensor( - values=list(tensor.values), - shape=[ - TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) - for dim in tensor.shape - ], - ) - for name, tensor in model.input_specs.items() - }, - output_specs={ - name: Tensor( - values=list(tensor.values), - shape=[ - TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) - for dim in tensor.shape - ], - ) - for name, tensor in model.output_specs.items() - }, - description=model.description, - ) - for model in response.models - ], - error=response.error if response.HasField("error") else None, - ) + request = GetModelsInfoRequest(all=True) + return self.stub.GetModelsInfo(request) def forward(self, model_uid: str, inputs: dict[str, Tensor]) -> ForwardResponse: - """Run inference using a specified model. + """Run inference on a model. Args: - model_uid: The UID of the model to use for inference. - inputs: Dictionary mapping tensor names to tensors. + model_uid: The UID of the model to use + inputs: Dictionary mapping input names to input tensors Returns: ForwardResponse containing: - outputs: Dictionary mapping tensor names to output tensors - error: Optional error information if inference failed + - outputs: Dictionary mapping tensor names to output tensors + - error: Optional error information """ tensor_inputs = {} for name, tensor in inputs.items(): @@ -227,18 +155,8 @@ def forward(self, model_uid: str, inputs: dict[str, Tensor]) -> ForwardResponse: inference_pb2.Tensor.Dimension(size=dim["size"], name=dim["name"], dynamic=dim["dynamic"]) for dim in tensor["shape"] ] - proto_tensor = inference_pb2.Tensor(values=tensor["values"], shape=shape) + proto_tensor = ProtoTensor(values=tensor["values"], shape=shape) tensor_inputs[name] = proto_tensor - response = self.stub.Forward(inference_pb2.ForwardRequest(model_uid=model_uid, inputs=tensor_inputs)) - - return ForwardResponse( - outputs={ - name: Tensor( - values=list(tensor.values), - shape=[TensorDimension(size=dim.size, name=dim.name, dynamic=dim.dynamic) for dim in tensor.shape], - ) - for name, tensor in response.outputs.items() - }, - error=response.error if response.HasField("error") else None, - ) + request = ForwardRequest(model_uid=model_uid, inputs=tensor_inputs) + return self.stub.Forward(request) diff --git a/kos-py/pykos/services/led_matrix.py b/kos-py/pykos/services/led_matrix.py index a3c7bae..a15ab77 100644 --- a/kos-py/pykos/services/led_matrix.py +++ b/kos-py/pykos/services/led_matrix.py @@ -5,13 +5,21 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, led_matrix_pb2, led_matrix_pb2_grpc +from kos_protos import common_pb2, led_matrix_pb2_grpc +from kos_protos.led_matrix_pb2 import ( + GetMatrixInfoResponse, + WriteBufferRequest, + WriteColorBufferRequest, +) class MatrixInfo(TypedDict): - """Information about the LED matrix. + """TypedDict containing LED matrix configuration details. - Args: + A dictionary type describing the physical layout and capabilities + of the LED matrix display. + + Fields: width: Width in pixels height: Height in pixels brightness_levels: Number of brightness levels supported (1 for binary on/off) @@ -31,7 +39,7 @@ class MatrixInfo(TypedDict): class ImageData(TypedDict): """Image data to be written to the LED matrix. - Args: + Fields: buffer: Raw image data bytes width: Image width in pixels height: Image height in pixels @@ -46,62 +54,70 @@ class ImageData(TypedDict): brightness: int +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + class LEDMatrixServiceClient: - """Client for the LEDMatrixService. + """Client for the LED matrix service. - This service allows controlling an LED matrix display. + This client provides methods to interact with an LED matrix display, + including querying its capabilities and writing image data to it. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the LED matrix service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = led_matrix_pb2_grpc.LEDMatrixServiceStub(channel) - def get_matrix_info(self) -> MatrixInfo: - """Get information about the LED matrix including dimensions and capabilities. + def get_matrix_info(self) -> GetMatrixInfoResponse: + """Get information about the LED matrix display. Returns: - MatrixInfo containing: - width: Width in pixels - height: Height in pixels - brightness_levels: Number of brightness levels supported - color_capable: Whether the matrix supports color - bits_per_pixel: Number of bits used to represent each pixel - error: Optional error information + GetMatrixInfoResponse containing: + - width: Width in pixels + - height: Height in pixels + - brightness_levels: Number of brightness levels supported + - color_capable: Whether the matrix supports color + - bits_per_pixel: Number of bits used to represent each pixel + - error: Optional error information """ return self.stub.GetMatrixInfo(Empty()) def write_buffer(self, buffer: bytes) -> common_pb2.ActionResponse: - """Write binary on/off states to the LED matrix. + """Write raw buffer data to the LED matrix. - The buffer should be width * height / 8 bytes long, where each bit - represents one LED's on/off state. + This method is for writing pre-formatted data that matches the matrix's + native format. For writing color images, use write_color_buffer instead. Args: - buffer: Binary buffer containing LED states + buffer: Raw buffer data bytes in the matrix's native format Returns: - ActionResponse indicating success/failure of the write operation. + ActionResponse indicating if the write was successful """ - request = led_matrix_pb2.WriteBufferRequest(buffer=buffer) + request = WriteBufferRequest(buffer=buffer) return self.stub.WriteBuffer(request) def write_color_buffer(self, **kwargs: Unpack[ImageData]) -> common_pb2.ActionResponse: - """Write image data to the LED matrix. + """Write a color image buffer to the LED matrix. Args: - **kwargs: Image data containing the raw bytes, dimensions and format - buffer: Raw image data bytes - width: Image width in pixels - height: Image height in pixels - format: Pixel format specification (e.g. 'RGB888', 'BGR888', 'RGB565', 'MONO8') - brightness: Global brightness level (0-255) + **kwargs: Image data containing: + buffer: Raw image data bytes + width: Image width in pixels + height: Image height in pixels + format: Pixel format specification (e.g. 'RGB888', 'BGR888', 'RGB565', 'MONO8') + brightness: Global brightness level (0-255) Returns: - ActionResponse indicating success/failure of the write operation. + ActionResponse indicating if the write was successful """ - request = led_matrix_pb2.WriteColorBufferRequest(**kwargs) + request = WriteColorBufferRequest(**kwargs) return self.stub.WriteColorBuffer(request) diff --git a/kos-py/pykos/services/process_manager.py b/kos-py/pykos/services/process_manager.py index 69f2896..0ce11a7 100644 --- a/kos-py/pykos/services/process_manager.py +++ b/kos-py/pykos/services/process_manager.py @@ -3,30 +3,48 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import process_manager_pb2, process_manager_pb2_grpc -from kos_protos.process_manager_pb2 import KClipStartRequest +from kos_protos import process_manager_pb2_grpc +from kos_protos.process_manager_pb2 import ( + KClipStartRequest, + KClipStartResponse, + KClipStopResponse, +) class ProcessManagerServiceClient: + """Client for the process manager service. + + This client provides methods to manage KClip recordings and other processes. + """ + def __init__(self, channel: grpc.Channel) -> None: + """Initialize the process manager service client. + + Args: + channel: gRPC channel for communication with the service + """ self.stub = process_manager_pb2_grpc.ProcessManagerServiceStub(channel) - def start_kclip(self, action: str) -> process_manager_pb2.KClipStartResponse: - """Start KClip recording. + def start_kclip(self, action: str) -> KClipStartResponse: + """Start a new KClip recording. Args: - action: The action string for the KClip request + action: The action being recorded Returns: - The response from the server. + KClipStartResponse containing: + - clip_uuid: Unique identifier for the recording session + - error: Optional error information if start failed """ request = KClipStartRequest(action=action) return self.stub.StartKClip(request) - def stop_kclip(self, request: Empty = Empty()) -> process_manager_pb2.KClipStopResponse: - """Stop KClip recording. + def stop_kclip(self, request: Empty = Empty()) -> KClipStopResponse: + """Stop the current KClip recording. Returns: - The response from the server. + KClipStopResponse containing: + - clip_uuid: Identifier of the stopped recording session + - error: Optional error information if stop failed """ return self.stub.StopKClip(request) diff --git a/kos-py/pykos/services/sim.py b/kos-py/pykos/services/sim.py index 1200529..89e3920 100644 --- a/kos-py/pykos/services/sim.py +++ b/kos-py/pykos/services/sim.py @@ -5,56 +5,98 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, sim_pb2, sim_pb2_grpc +from kos_protos import common_pb2, sim_pb2_grpc +from kos_protos.sim_pb2 import ( + DefaultPosition, + GetParametersResponse, + ResetRequest, + SetParametersRequest, + SetPausedRequest, + SimulationParameters, + StepRequest, +) -class DefaultPosition(TypedDict): +class DefaultPositionInput(TypedDict): + """Initial simulation state. + + Fields: + qpos: List of joint positions in simulation units + """ + qpos: list[float] -class ResetRequest(TypedDict): - initial_state: NotRequired[DefaultPosition] +class ResetParams(TypedDict): + """Simulation reset parameters. + + Fields: + initial_state: Optional DefaultPosition to set initial joint positions + randomize: Optional flag to add randomization during reset + """ + + initial_state: NotRequired[DefaultPositionInput] randomize: NotRequired[bool] -class StepRequest(TypedDict): +class StepParams(TypedDict): + """Parameters for stepping simulation. + + Fields: + num_steps: Number of simulation steps to take + step_size: Optional duration of each step in seconds + """ + num_steps: int step_size: NotRequired[float] -class SimulationParameters(TypedDict): +class SimulationParams(TypedDict): + """Parameters for configuring simulation. + + Fields: + time_scale: Optional simulation time scale factor + gravity: Optional gravitational acceleration in m/s² + initial_state: Optional default joint positions + """ + time_scale: NotRequired[float] gravity: NotRequired[float] - initial_state: NotRequired[DefaultPosition] + initial_state: NotRequired[DefaultPositionInput] class SimServiceClient: + """Client for the simulation service. + + This client provides methods to control and configure the physics simulation, + including resetting, stepping, pausing, and parameter adjustment. + """ + def __init__(self, channel: grpc.Channel) -> None: + """Initialize the simulation service client. + + Args: + channel: gRPC channel for communication with the service + """ self.stub = sim_pb2_grpc.SimulationServiceStub(channel) - def reset(self, **kwargs: Unpack[ResetRequest]) -> common_pb2.ActionResponse: - """Reset the simulation to its initial state. + def reset(self, **kwargs: Unpack[ResetParams]) -> common_pb2.ActionResponse: + """Reset the simulation to a known state. Args: **kwargs: Reset parameters that may include: initial_state: DefaultPosition to reset to randomize: Whether to randomize the initial state - Example: - >>> client.reset( - ... initial_state={"qpos": [0.0, 0.0, 0.0]}, - ... randomize=True - ... ) - Returns: - ActionResponse indicating success/failure + ActionResponse indicating if the reset was successful """ initial_state = None if "initial_state" in kwargs: pos = kwargs["initial_state"] - initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"]) + initial_state = DefaultPosition(qpos=pos["qpos"]) - request = sim_pb2.ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize")) + request = ResetRequest(initial_state=initial_state, randomize=kwargs.get("randomize")) return self.stub.Reset(request) def set_paused(self, paused: bool) -> common_pb2.ActionResponse: @@ -64,58 +106,55 @@ def set_paused(self, paused: bool) -> common_pb2.ActionResponse: paused: True to pause, False to unpause Returns: - ActionResponse indicating success/failure + ActionResponse indicating if the pause state was set successfully """ - request = sim_pb2.SetPausedRequest(paused=paused) + request = SetPausedRequest(paused=paused) return self.stub.SetPaused(request) def step(self, num_steps: int, step_size: float | None = None) -> common_pb2.ActionResponse: - """Step the simulation forward. + """Step the simulation forward by a specified number of steps. Args: num_steps: Number of simulation steps to take - step_size: Optional time per step in seconds + step_size: Optional duration of each step in seconds Returns: - ActionResponse indicating success/failure + ActionResponse indicating if the stepping was successful """ - request = sim_pb2.StepRequest(num_steps=num_steps, step_size=step_size) + request = StepRequest(num_steps=num_steps, step_size=step_size) return self.stub.Step(request) - def set_parameters(self, **kwargs: Unpack[SimulationParameters]) -> common_pb2.ActionResponse: + def set_parameters(self, **kwargs: Unpack[SimulationParams]) -> common_pb2.ActionResponse: """Set simulation parameters. - Example: - >>> client.set_parameters( - ... time_scale=1.0, - ... gravity=9.81, - ... initial_state={"qpos": [0.0, 0.0, 0.0]} - ... ) - Args: **kwargs: Parameters that may include: - time_scale: Simulation time scale - gravity: Gravity constant - initial_state: Default position state + time_scale: Simulation time scale factor + gravity: Gravitational acceleration in m/s² + initial_state: Default joint positions Returns: - ActionResponse indicating success/failure + ActionResponse indicating if the parameters were set successfully """ initial_state = None if "initial_state" in kwargs: pos = kwargs["initial_state"] - initial_state = sim_pb2.DefaultPosition(qpos=pos["qpos"]) + initial_state = DefaultPosition(qpos=pos["qpos"]) - params = sim_pb2.SimulationParameters( - time_scale=kwargs.get("time_scale"), gravity=kwargs.get("gravity"), initial_state=initial_state + params = SimulationParameters( + time_scale=kwargs.get("time_scale"), + gravity=kwargs.get("gravity"), + initial_state=initial_state, ) - request = sim_pb2.SetParametersRequest(parameters=params) + request = SetParametersRequest(parameters=params) return self.stub.SetParameters(request) - def get_parameters(self) -> sim_pb2.GetParametersResponse: + def get_parameters(self) -> GetParametersResponse: """Get current simulation parameters. Returns: - GetParametersResponse containing current parameters and any error + GetParametersResponse containing: + - parameters: Current simulation parameters + - error: Optional error information """ return self.stub.GetParameters(Empty()) diff --git a/kos-py/pykos/services/sound.py b/kos-py/pykos/services/sound.py index d020484..e740d1b 100644 --- a/kos-py/pykos/services/sound.py +++ b/kos-py/pykos/services/sound.py @@ -1,21 +1,30 @@ """Sound service client.""" -from typing import Generator, Iterator, NotRequired, TypedDict, Unpack +from typing import Generator, Iterator, NotRequired, TypedDict import grpc from google.protobuf.empty_pb2 import Empty -from kos_protos import common_pb2, sound_pb2, sound_pb2_grpc +from kos_protos import common_pb2, sound_pb2_grpc +from kos_protos.sound_pb2 import ( + AudioConfig as ProtoAudioConfig, + GetAudioInfoResponse, + PlayAudioRequest, + RecordAudioRequest, +) class AudioCapability(TypedDict): - """Information about audio capabilities. + """TypedDict containing information about audio capabilities. - Args: - sample_rates: List of supported sample rates (e.g., 44100, 48000) - bit_depths: List of supported bit depths (e.g., 16, 24, 32) - channels: List of supported channel counts (e.g., 1, 2) - available: Whether this capability is available + A dictionary type describing the supported audio configurations + and current availability of the audio system. + + Fields: + sample_rates: List of supported sampling rates in Hz + bit_depths: List of supported bit depths + channels: List of supported channel counts + available: Whether the audio system is currently available """ sample_rates: list[int] @@ -41,7 +50,7 @@ class AudioInfo(TypedDict): class AudioConfig(TypedDict): """Audio configuration parameters. - Args: + Fields: sample_rate: Sample rate in Hz (e.g., 44100) bit_depth: Bit depth (e.g., 16) channels: Number of channels (1 for mono, 2 for stereo) @@ -52,94 +61,108 @@ class AudioConfig(TypedDict): channels: int +class ActionResponse(TypedDict): + """Response indicating success/failure of an action.""" + + success: bool + error: NotRequired[common_pb2.Error | None] + + class SoundServiceClient: - """Client for the SoundService. + """Client for the sound service. - This service allows playing audio through speakers and recording from microphones. + This client provides methods to interact with the audio system, + including playback and recording capabilities. """ def __init__(self, channel: grpc.Channel) -> None: """Initialize the sound service client. Args: - channel: gRPC channel to use for communication. + channel: gRPC channel for communication with the service """ self.stub = sound_pb2_grpc.SoundServiceStub(channel) - def get_audio_info(self) -> AudioInfo: - """Get information about audio capabilities. + def get_audio_info(self) -> GetAudioInfoResponse: + """Get information about audio system capabilities. Returns: - AudioInfo containing playback and recording capabilities. + GetAudioInfoResponse containing: + - playback: Playback capabilities (sample rates, bit depths, channels) + - recording: Recording capabilities (sample rates, bit depths, channels) + - error: Optional error information """ return self.stub.GetAudioInfo(Empty()) - def play_audio(self, audio_iterator: Iterator[bytes], **kwargs: Unpack[AudioConfig]) -> common_pb2.ActionResponse: - """Stream PCM audio data to the speaker. + def play_audio( + self, + audio_iterator: Iterator[bytes], + sample_rate: int, + bit_depth: int, + channels: int, + ) -> common_pb2.ActionResponse: + """Play audio data through the audio system. Args: - audio_iterator: Iterator yielding chunks of PCM audio data - **kwargs: Audio configuration parameters - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) + audio_iterator: Iterator yielding audio data chunks + sample_rate: Sample rate in Hz (e.g., 44100) + bit_depth: Bit depth (e.g., 16) + channels: Number of channels (1 for mono, 2 for stereo) Returns: - ActionResponse indicating success/failure of the playback operation. - - Example: - >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=2) - >>> with open('audio.raw', 'rb') as f: - ... def chunks(): - ... while chunk := f.read(4096): - ... yield chunk - ... response = client.play_audio(chunks(), config) + ActionResponse indicating if the playback was successful """ - def request_iterator() -> Generator[sound_pb2.PlayAudioRequest, None, None]: + def request_iterator() -> Generator[PlayAudioRequest, None, None]: # First message includes config - yield sound_pb2.PlayAudioRequest( - config=sound_pb2.AudioConfig(**kwargs), + yield PlayAudioRequest( + config=ProtoAudioConfig( + sample_rate=sample_rate, + bit_depth=bit_depth, + channels=channels, + ) ) # Subsequent messages contain audio data for chunk in audio_iterator: - yield sound_pb2.PlayAudioRequest(audio_data=chunk) + yield PlayAudioRequest(audio_data=chunk) return self.stub.PlayAudio(request_iterator()) - def record_audio(self, duration_ms: int = 0, **kwargs: Unpack[AudioConfig]) -> Generator[bytes, None, None]: - """Record PCM audio data from the microphone. + def record_audio( + self, + duration_ms: int = 0, + sample_rate: int = 44100, + bit_depth: int = 16, + channels: int = 1, + ) -> Generator[bytes, None, None]: + """Record audio from the audio system. Args: - duration_ms: Recording duration in milliseconds (0 for continuous) - **kwargs: Audio configuration parameters - sample_rate: Sample rate in Hz (e.g., 44100) - bit_depth: Bit depth (e.g., 16) - channels: Number of channels (1 for mono, 2 for stereo) - - Yields: - Chunks of PCM audio data. - - Example: - >>> config = AudioConfig(sample_rate=44100, bit_depth=16, channels=1) - >>> with open('recording.raw', 'wb') as f: - ... for chunk in client.record_audio(duration_ms=5000, **config): - ... f.write(chunk) + duration_ms: Duration to record in milliseconds (0 for continuous) + sample_rate: Sample rate in Hz (default: 44100) + bit_depth: Bit depth (default: 16) + channels: Number of channels (default: 1 for mono) + + Returns: + Generator yielding recorded audio data chunks """ - request = sound_pb2.RecordAudioRequest( - config=sound_pb2.AudioConfig(**kwargs), + request = RecordAudioRequest( + config=ProtoAudioConfig( + sample_rate=sample_rate, + bit_depth=bit_depth, + channels=channels, + ), duration_ms=duration_ms, ) - for response in self.stub.RecordAudio(request): if response.HasField("error"): raise RuntimeError(f"Recording error: {response.error}") yield response.audio_data def stop_recording(self) -> common_pb2.ActionResponse: - """Stop an ongoing recording session. + """Stop the current audio recording. Returns: - ActionResponse indicating success/failure of the stop operation. + ActionResponse indicating if the recording was stopped successfully """ return self.stub.StopRecording(Empty()) diff --git a/kos-py/tests/test_pykos.py b/kos-py/tests/test_pykos.py index bd20f54..b8c6e70 100644 --- a/kos-py/tests/test_pykos.py +++ b/kos-py/tests/test_pykos.py @@ -4,6 +4,10 @@ import pytest import pykos +from kos_protos.common_pb2 import ActionResponse +from pykos.services.actuator import GetActuatorsStateResponse +from pykos.services.imu import IMUValuesResponse +from pykos.services.process_manager import KClipStartResponse, KClipStopResponse def test_dummy() -> None: @@ -16,28 +20,28 @@ def test_pykos() -> None: client = pykos.KOS("127.0.0.1") # Tests configuring the actuator. - actuator_response = client.actuator.configure_actuator(actuator_id=1) - assert actuator_response.success + actuator_response: ActionResponse = client.actuator.configure_actuator(actuator_id=1) + assert actuator_response["success"] # Tests getting the actuator state. - actuator_state = client.actuator.get_actuators_state(actuator_ids=[1]) - assert actuator_state.states[0].actuator_id == 1 + actuator_state: GetActuatorsStateResponse = client.actuator.get_actuators_state(actuator_ids=[1]) + assert actuator_state["states"][0]["actuator_id"] == 1 # Tests the IMU endpoints. - imu_response = client.imu.get_imu_values() - assert imu_response.accel_x is not None + imu_response: IMUValuesResponse = client.imu.get_imu_values() + assert imu_response["accel_x"] is not None client.imu.get_imu_advanced_values() client.imu.get_euler_angles() client.imu.get_quaternion() client.imu.calibrate() zero_response = client.imu.zero(duration=1.0, max_retries=1, max_angular_error=1.0) - assert zero_response.success + assert zero_response["success"] # Tests the K-Clip endpoints. - start_kclip_response = client.process_manager.start_kclip(action="start") - assert start_kclip_response.clip_uuid is not None - stop_kclip_response = client.process_manager.stop_kclip() - assert stop_kclip_response.clip_uuid is not None + start_kclip_response: KClipStartResponse = client.process_manager.start_kclip(action="start") + assert start_kclip_response["clip_uuid"] is not None + stop_kclip_response: KClipStopResponse = client.process_manager.stop_kclip() + assert stop_kclip_response["clip_uuid"] is not None def is_server_running(address: str) -> bool: