From 8d096d3a24dc0f0b64eb7da3e5076cfbbe807fd2 Mon Sep 17 00:00:00 2001 From: David Lechner Date: Fri, 13 Feb 2026 20:43:39 -0600 Subject: [PATCH] resource: add ManagedGPIO resources/agent Add support for GPIOs via the gpio-manager D-Bus service. Some kernels are starting to ship with the gpio sysfs interface disabled, so an alternative is needed. The gpio-manager service will allow the pin state to persist even after the agent exits, which is something that the character device interface cannot do but is needed for some use cases. Signed-off-by: David Lechner --- doc/configuration.rst | 66 ++++++- examples/managed-gpio/import-gpio.yaml | 9 + examples/managed-gpio/managed_gpio.py | 28 +++ examples/managed-gpio/managed_gpio_remote.py | 25 +++ labgrid/driver/gpiodriver.py | 44 +++-- labgrid/remote/client.py | 14 +- labgrid/remote/exporter.py | 33 +++- labgrid/resource/__init__.py | 3 +- labgrid/resource/base.py | 11 ++ labgrid/resource/remote.py | 13 ++ labgrid/resource/suggest.py | 4 +- labgrid/resource/udev.py | 28 +++ labgrid/util/agents/managed_gpio.py | 186 +++++++++++++++++++ pyproject.toml | 3 +- tests/test_agent.py | 3 +- 15 files changed, 448 insertions(+), 22 deletions(-) create mode 100644 examples/managed-gpio/import-gpio.yaml create mode 100644 examples/managed-gpio/managed_gpio.py create mode 100644 examples/managed-gpio/managed_gpio_remote.py create mode 100644 labgrid/util/agents/managed_gpio.py diff --git a/doc/configuration.rst b/doc/configuration.rst index 50df7588a..d49215111 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -608,6 +608,34 @@ NetworkHIDRelay A :any:`NetworkHIDRelay` describes an `HIDRelay`_ resource available on a remote computer. +ManagedGPIO ++++++++++++ + +A :any:`ManagedGPIO` resource describes a GPIO line that is managed by the +`gpio-manager `__ +daemon. + +.. note:: + The ``gpio-manager`` daemon needs to be running on the same system as the + gpiochip and configured to manage the specified GPIO line. See the + `gpiocli request `__ + documentation for details. + +.. code-block:: yaml + + ManagedGPIO: + chip: /dev/gpiochip0 + pin: 0 + +Arguments: + - chip (str | int): path to the gpiochip device, e.g. ``/dev/gpiochip0`` or + the gpiochip number, e.g. ``0``. + - pin (str | int): gpio pin name or offset within the gpiochip, e.g. ``0`` + +Used by: + - `GpioDigitalOutputDriver`_ + + SysfsGPIO +++++++++ @@ -624,11 +652,42 @@ Arguments: Used by: - `GpioDigitalOutputDriver`_ +NetworkManagedGPIO +++++++++++++++++++ +A :any:`NetworkManagedGPIO` describes a `ManagedGPIO`_ resource available on a +remote computer. + NetworkSysfsGPIO ++++++++++++++++ A :any:`NetworkSysfsGPIO` describes a `SysfsGPIO`_ resource available on a remote computer. +MatchedManagedGPIO +++++++++++++++++++ +A :any:`MatchedManagedGPIO` describes a GPIO line, like a `ManagedGPIO`_. +The gpiochip is identified by matching udev properties. This allows +identification through hot-plugging or rebooting for controllers like +USB based gpiochips. + +.. code-block:: yaml + + MatchedManagedGPIO: + match: + '@SUBSYSTEM': 'usb' + '@ID_SERIAL_SHORT': 'D38EJ8LF' + pin: 0 + +The example would search for a USB gpiochip with the key ``ID_SERIAL_SHORT`` +and the value ``D38EJ8LF`` and use the pin 0 of this device. +The ``ID_SERIAL_SHORT`` property is set by the usb_id builtin helper program. + +Arguments: + - match (dict): key and value pairs for a udev match, see `udev Matching`_ + - pin (str | int): gpio pin name or offset within the matched gpiochip. + +Used by: + - `GpioDigitalOutputDriver`_ + MatchedSysfsGPIO ++++++++++++++++ A :any:`MatchedSysfsGPIO` describes a GPIO line, like a `SysfsGPIO`_. @@ -2357,13 +2416,16 @@ GpioDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~ The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line. -This driver configures GPIO lines via -`the sysfs kernel interface `__. +This driver configures GPIO lines via `gpio-manager `__ +or `the legacy sysfs kernel interface `__. While the driver automatically exports the GPIO, it does not configure it in any other way than as an output. Binds to: gpio: + - `ManagedGPIO`_ + - `MatchedManagedGPIO`_ + - `NetworkManagedGPIO`_ - `SysfsGPIO`_ - `MatchedSysfsGPIO`_ - `NetworkSysfsGPIO`_ diff --git a/examples/managed-gpio/import-gpio.yaml b/examples/managed-gpio/import-gpio.yaml new file mode 100644 index 000000000..4ba7b223f --- /dev/null +++ b/examples/managed-gpio/import-gpio.yaml @@ -0,0 +1,9 @@ +targets: + main: + resources: + RemotePlace: + name: gpio + drivers: + GpioDigitalOutputDriver: {} +options: + coordinator_address: 'labgrid:20408' diff --git a/examples/managed-gpio/managed_gpio.py b/examples/managed-gpio/managed_gpio.py new file mode 100644 index 000000000..2022ef218 --- /dev/null +++ b/examples/managed-gpio/managed_gpio.py @@ -0,0 +1,28 @@ +import logging +import time + +from labgrid import Target +from labgrid.logging import basicConfig, StepLogger +from labgrid.driver import GpioDigitalOutputDriver +from labgrid.resource import ManagedGPIO + +# enable info logging +basicConfig(level=logging.INFO) + +# show labgrid steps on the console +StepLogger.start() + +t = Target("main") +r = ManagedGPIO(t, name=None, chip="/dev/gpiochip0", pin=0) +d = GpioDigitalOutputDriver(t, name=None) + +p = t.get_driver("DigitalOutputProtocol") +print(t.resources) +p.set(True) +print(p.get()) +time.sleep(2) +p.set(False) +print(p.get()) +time.sleep(2) +p.set(True) +print(p.get()) diff --git a/examples/managed-gpio/managed_gpio_remote.py b/examples/managed-gpio/managed_gpio_remote.py new file mode 100644 index 000000000..4b16b8466 --- /dev/null +++ b/examples/managed-gpio/managed_gpio_remote.py @@ -0,0 +1,25 @@ +import logging +import time + +from labgrid import Environment +from labgrid.logging import basicConfig, StepLogger + +# enable info logging +basicConfig(level=logging.INFO) + +# show labgrid steps on the console +StepLogger.start() + +e = Environment("import-gpio.yaml") +t = e.get_target() + +p = t.get_driver("DigitalOutputProtocol") +print(t.resources) +p.set(True) +print(p.get()) +time.sleep(2) +p.set(False) +print(p.get()) +time.sleep(2) +p.set(True) +print(p.get()) diff --git a/labgrid/driver/gpiodriver.py b/labgrid/driver/gpiodriver.py index 1d987761a..b23c7927a 100644 --- a/labgrid/driver/gpiodriver.py +++ b/labgrid/driver/gpiodriver.py @@ -1,9 +1,13 @@ """All GPIO-related drivers""" +from typing import Union + import attr from ..factory import target_factory from ..protocol import DigitalOutputProtocol -from ..resource.remote import NetworkSysfsGPIO +from ..resource.base import ManagedGPIO, SysfsGPIO +from ..resource.remote import NetworkManagedGPIO, NetworkSysfsGPIO +from ..resource.udev import MatchedManagedGPIO, MatchedSysfsGPIO from ..step import step from .common import Driver from ..util.agentwrapper import AgentWrapper @@ -12,9 +16,17 @@ @target_factory.reg_driver @attr.s(eq=False) class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol): + gpio: Union[ManagedGPIO, MatchedManagedGPIO, NetworkManagedGPIO, SysfsGPIO, MatchedSysfsGPIO, NetworkSysfsGPIO] bindings = { - "gpio": {"SysfsGPIO", "MatchedSysfsGPIO", "NetworkSysfsGPIO"}, + "gpio": { + "ManagedGPIO", + "MatchedManagedGPIO", + "NetworkManagedGPIO", + "SysfsGPIO", + "MatchedSysfsGPIO", + "NetworkSysfsGPIO", + }, } def __attrs_post_init__(self): @@ -22,12 +34,16 @@ def __attrs_post_init__(self): self.wrapper = None def on_activate(self): - if isinstance(self.gpio, NetworkSysfsGPIO): - host = self.gpio.host - else: - host = None + host = self.gpio.host if isinstance(self.gpio, (NetworkSysfsGPIO, NetworkManagedGPIO)) else None + self.wrapper = AgentWrapper(host) - self.proxy = self.wrapper.load('sysfsgpio') + + self.is_sysfs = isinstance(self.gpio, (SysfsGPIO, MatchedSysfsGPIO, NetworkSysfsGPIO)) + + if self.is_sysfs: + self.proxy = self.wrapper.load('sysfsgpio') + else: + self.proxy = self.wrapper.load('managed_gpio') def on_deactivate(self): self.wrapper.close() @@ -36,10 +52,16 @@ def on_deactivate(self): @Driver.check_active @step(args=['status']) - def set(self, status): - self.proxy.set(self.gpio.index, status) + def set(self, status: bool) -> None: + if self.is_sysfs: + self.proxy.set(self.gpio.index, status) + else: + self.proxy.set(self.gpio.chip, self.gpio.pin, status) @Driver.check_active @step(result=True) - def get(self): - return self.proxy.get(self.gpio.index) + def get(self) -> bool: + if self.is_sysfs: + return self.proxy.get(self.gpio.index) + + return self.proxy.get(self.gpio.chip, self.gpio.pin) diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 76aa174bb..62f67cbd8 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -938,7 +938,7 @@ def power(self): name = self.args.name target = self._get_target(place) from ..resource.power import NetworkPowerPort, PDUDaemonPort - from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkSysfsGPIO + from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkManagedGPIO, NetworkSysfsGPIO from ..resource import TasmotaPowerPort, NetworkYKUSHPowerPort drv = None @@ -960,7 +960,7 @@ def power(self): drv = self._get_driver_or_new(target, "TasmotaPowerDriver", name=name) elif isinstance(resource, NetworkYKUSHPowerPort): drv = self._get_driver_or_new(target, "YKUSHPowerDriver", name=name) - elif isinstance(resource, NetworkSysfsGPIO): + elif isinstance(resource, (NetworkManagedGPIO, NetworkSysfsGPIO)): self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) drv = self._get_driver_or_new(target, "DigitalOutputPowerDriver", name=name) if drv: @@ -980,7 +980,13 @@ def digital_io(self): name = self.args.name target = self._get_target(place) from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput, WaveshareModbusTCPCoil - from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay + from ..resource.remote import ( + NetworkDeditecRelais8, + NetworkHIDRelay, + NetworkLXAIOBusPIO, + NetworkManagedGPIO, + NetworkSysfsGPIO, + ) drv = None try: @@ -999,7 +1005,7 @@ def digital_io(self): drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name) elif isinstance(resource, NetworkDeditecRelais8): drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name) - elif isinstance(resource, NetworkSysfsGPIO): + elif isinstance(resource, (NetworkManagedGPIO, NetworkSysfsGPIO)): drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) elif isinstance(resource, NetworkLXAIOBusPIO): drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name) diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index 0e48f1942..68511a06a 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -13,7 +13,7 @@ from urllib.parse import urlsplit import warnings from pathlib import Path -from typing import Dict, Type +from typing import Any, Dict, Type from socket import gethostname, getfqdn import attr @@ -639,6 +639,37 @@ def _get_params(self): exports["SNMPEthernetPort"] = EthernetPortExport +@attr.s(eq=False) +class ManagedGPIOExport(ResourceExport): + """ResourceExport for GPIO lines accessed via gpio-manager D-Bus service""" + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + if self.cls == "ManagedGPIO": + from ..resource.base import ManagedGPIO + + self.local = ManagedGPIO(target=None, name=None, **self.local_params) + elif self.cls == "MatchedManagedGPIO": + from ..resource.udev import MatchedManagedGPIO + + self.local = MatchedManagedGPIO(target=None, name=None, **self.local_params) + + self.data["cls"] = "NetworkManagedGPIO" + + def _get_params(self) -> dict[str, Any]: + """Helper function to return parameters""" + return { + "host": self.host, + "chip": self.local.chip, + "pin": self.local.pin, + } + + +exports["ManagedGPIO"] = ManagedGPIOExport +exports["MatchedManagedGPIO"] = ManagedGPIOExport + + @attr.s(eq=False) class GPIOSysFSExport(ResourceExport): _gpio_sysfs_path_prefix = "/sys/class/gpio" diff --git a/labgrid/resource/__init__.py b/labgrid/resource/__init__.py index f5d77e711..f4dc7d604 100644 --- a/labgrid/resource/__init__.py +++ b/labgrid/resource/__init__.py @@ -1,4 +1,4 @@ -from .base import SerialPort, NetworkInterface, EthernetPort, SysfsGPIO +from .base import SerialPort, NetworkInterface, EthernetPort, ManagedGPIO, SysfsGPIO from .ethernetport import SNMPEthernetPort from .serialport import RawSerialPort, NetworkSerialPort from .modbus import ModbusTCPCoil, WaveshareModbusTCPCoil @@ -15,6 +15,7 @@ HIDRelay, IMXUSBLoader, LXAUSBMux, + MatchedManagedGPIO, MatchedSysfsGPIO, MXSUSBLoader, RKUSBLoader, diff --git a/labgrid/resource/base.py b/labgrid/resource/base.py index d8cdb984c..750244a97 100644 --- a/labgrid/resource/base.py +++ b/labgrid/resource/base.py @@ -36,6 +36,17 @@ class EthernetPort(Resource): interface = attr.ib(default=None) +@target_factory.reg_resource +@attr.s(eq=False) +class ManagedGPIO(Resource): + """The basic ManagedGPIO contains an index + + Args: + chip (str): path to gpiochip device. + pin (str | int): name or index of target gpio line.""" + chip = attr.ib(default=None, validator=attr.validators.instance_of(str)) + pin = attr.ib(default=None, validator=attr.validators.instance_of((str, int))) + @target_factory.reg_resource @attr.s(eq=False) class SysfsGPIO(Resource): diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index d2a63d5e9..05382cfcb 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -344,6 +344,19 @@ def __attrs_post_init__(self): self.timeout = 10.0 super().__attrs_post_init__() +@target_factory.reg_resource +@attr.s(eq=False) +class NetworkManagedGPIO(NetworkResource, ManagedResource): + manager_cls = RemotePlaceManager + + """The NetworkManagedGPIO describes a remotely accessible gpio line""" + chip = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(str))) + pin = attr.ib(validator=attr.validators.optional(attr.validators.instance_of((str, int)))) + + def __attrs_post_init__(self): + self.timeout = 10.0 + super().__attrs_post_init__() + @target_factory.reg_resource @attr.s(eq=False) diff --git a/labgrid/resource/suggest.py b/labgrid/resource/suggest.py index 0aa93033d..41de8ec84 100644 --- a/labgrid/resource/suggest.py +++ b/labgrid/resource/suggest.py @@ -24,6 +24,7 @@ HIDRelay, USBDebugger, USBPowerPort, + MatchedManagedGPIO, MatchedSysfsGPIO ) from ..util import dump @@ -59,6 +60,7 @@ def __init__(self, args): self.resources.append(HIDRelay(**args)) self.resources.append(USBDebugger(**args)) self.resources.append(USBPowerPort(**args, index=0)) + self.resources.append(MatchedManagedGPIO(**args, pin=0)) self.resources.append(MatchedSysfsGPIO(**args, pin=0)) def suggest_callback(self, resource, meta, suggestions): @@ -88,7 +90,7 @@ def suggest_callback(self, resource, meta, suggestions): )) if cls == 'USBPowerPort': print(' index: ?') - if cls == 'MatchedSysfsGPIO': + if cls in ('MatchedManagedGPIO', 'MatchedSysfsGPIO'): print(' pin: ?') print(" ---") print() diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index 8e10d09eb..9a27e4d65 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -822,6 +822,34 @@ def filter_match(self, device): return super().filter_match(device) +@target_factory.reg_resource +@attr.s(eq=False) +class MatchedManagedGPIO(USBResource): + """The MatchedManagedGPIO described a ManagedGPIO matched by Udev + + Args: + pin (str | int): gpio pin name or offset within the matched gpiochip.""" + pin = attr.ib(default=None, validator=attr.validators.instance_of((str, int))) + chip = None + + def __attrs_post_init__(self): + self.match['SUBSYSTEM'] = 'gpio' + super().__attrs_post_init__() + + def filter_match(self, device): + # Match only the char device + if device.properties.get('DEVNAME') is None: + return False + + return super().filter_match(device) + + def update(self): + super().update() + if self.device is not None: + self.chip = self.device.properties.get('DEVNAME') + else: + self.chip = None + @target_factory.reg_resource @attr.s(eq=False) class MatchedSysfsGPIO(USBResource): diff --git a/labgrid/util/agents/managed_gpio.py b/labgrid/util/agents/managed_gpio.py new file mode 100644 index 000000000..5cf29248c --- /dev/null +++ b/labgrid/util/agents/managed_gpio.py @@ -0,0 +1,186 @@ +""" +This module implements switching GPIOs via gpio-manager D-Bus service. + +Takes chip' and 'pin' as parameters which are the path to the gpiochip device +and the pin name/number respectively. + +""" + +import logging +from collections.abc import Callable +from pathlib import Path +from typing import Any, Protocol, Union, cast + +from jeepney import DBusAddress, MessageGenerator, Properties, new_method_call +from jeepney.io.blocking import Proxy, open_dbus_connection + + +class PropertiesProxy(Protocol): + def get_all(self) -> tuple[dict[str, Any]]: ... + + +class ObjectManagerProxy(Protocol): + def GetManagedObjects(self) -> tuple[dict[str, dict[str, dict[str, Any]]]]: ... + + +class ObjectManager(MessageGenerator): + interface = "org.freedesktop.DBus.ObjectManager" + + def __init__(self, object_path: str, bus_name: str = "io.gpiod1") -> None: + super().__init__(object_path=object_path, bus_name=bus_name) + + def GetManagedObjects(self): + return new_method_call(self, "GetManagedObjects") + + +class ChipProxy(Protocol): + def RequestLines( + self, + line_config: tuple[list[tuple[list[int], dict[str, tuple[str, Any]]]], list[int]], + request_config: dict[str, tuple[str, Any]], + ) -> tuple[str]: ... + + +class Chip(MessageGenerator): + interface = "io.gpiod1.Chip" + + def __init__(self, object_path: str, bus_name: str = "io.gpiod1"): + super().__init__(object_path=object_path, bus_name=bus_name) + + def RequestLines(self, line_config: Any, request_config: Any): + return new_method_call(self, "RequestLines", "(a(aua{sv})ai)a{sv}", (line_config, request_config)) + + +class RequestProxy(Protocol): + def Release(self) -> None: ... + def ReconfigureLines( + self, line_config: tuple[list[tuple[list[int], dict[str, tuple[str, Any]]]], list[int]] + ) -> None: ... + def GetValues(self, offsets: list[int]) -> tuple[list[int]]: ... + def SetValues(self, values: dict[int, int]) -> None: ... + + +class Request(MessageGenerator): + interface = "io.gpiod1.Request" + + def __init__(self, object_path: str, bus_name: str = "io.gpiod1") -> None: + super().__init__(object_path=object_path, bus_name=bus_name) + + def Release(self): + return new_method_call(self, "Release") + + def ReconfigureLines(self, line_config: Any): + return new_method_call(self, "ReconfigureLines", "(a(aua{sv})ai)", (line_config,)) + + def GetValues(self, offsets: Any): + return new_method_call(self, "GetValues", "au", (offsets,)) + + def SetValues(self, values: Any): + return new_method_call(self, "SetValues", "a{ui}", (values,)) + + +class GpioDigitalOutput: + def __init__(self, chip: str, pin: Union[str, int]) -> None: + self._logger = logging.getLogger("Device: ") + + # If chip is int, assume it's the gpiochip number and construct the name + # from that. Otherwise, resolve the path in case it is a symlink and + # get the real name from that. + chip = f"gpiochip{chip}" if isinstance(chip, int) else str(Path(chip).resolve().name) + + # Then connect to D-Bus and look for a matching request. + + self._system_bus = open_dbus_connection(bus="SYSTEM") + + requests_obj = cast(ObjectManagerProxy, Proxy(ObjectManager("/io/gpiod1/requests"), self._system_bus)) + (requests,) = requests_obj.GetManagedObjects() + + for req_path, props in requests.items(): + req = props["io.gpiod1.Request"] + + if Path(req["ChipPath"][1]).name != chip: + continue + + for line_path in req["LinePaths"][1]: + line_obj = cast( + PropertiesProxy, + Proxy(Properties(DBusAddress(line_path, "io.gpiod1", "io.gpiod1.Line")), self._system_bus), + ) + (line,) = line_obj.get_all() + + if line["Offset"][1] == pin or line["Name"][1] == pin: + self._offset: int = line["Offset"][1] + self._req = cast(RequestProxy, Proxy(Request(req_path), self._system_bus)) + break + + else: + continue + + break + + else: + # If we didn't find a match, the make the request ourselves. + self._logger.debug("Requesting GPIO %r on chip %r via gpio-manager.", pin, chip) + + chips_obj = cast(ObjectManagerProxy, Proxy(ObjectManager("/io/gpiod1/chips"), self._system_bus)) + (chips,) = chips_obj.GetManagedObjects() + + for chip_path, props in chips.items(): + chip_info = props["io.gpiod1.Chip"] + + if chip_info["Name"][1] != chip: + continue + + if isinstance(pin, str): + raise NotImplementedError("Pin name lookup not implemented, only pin index.") + + chip_obj = cast(ChipProxy, Proxy(Chip(chip_path), self._system_bus)) + (req_path,) = chip_obj.RequestLines(([([pin], {})], []), {}) + + self._offset = pin + self._req = cast(RequestProxy, Proxy(Request(req_path), self._system_bus)) + + break + else: + raise ValueError(f"Chip {chip!r} not found.") + + def __del__(self): + if self._system_bus: + self._system_bus.close() + self._system_bus = None + + def get(self) -> bool: + return bool(self._req.GetValues([self._offset])[0][0]) + + def set(self, status: bool) -> None: + # Have to call ReconfigureLines instead of SetValue in case the line is + # currently configured as input (SetValue fails rather than changing the direction). + self._req.ReconfigureLines(([([self._offset], {"direction": ("s", "output")})], [status])) + + +_gpios: dict[tuple[str, Union[str, int]], GpioDigitalOutput] = {} + + +def _get_gpio_line(chip: str, pin: Union[str, int]) -> GpioDigitalOutput: + real_chip = str(Path(chip).resolve()) + + if (real_chip, pin) not in _gpios: + _gpios[(real_chip, pin)] = GpioDigitalOutput(real_chip, pin) + + return _gpios[(real_chip, pin)] + + +def handle_set(chip: str, pin: Union[str, int], status: bool) -> None: + gpio_line = _get_gpio_line(chip, pin) + gpio_line.set(status) + + +def handle_get(chip: str, pin: Union[str, int]) -> bool: + gpio_line = _get_gpio_line(chip, pin) + return gpio_line.get() + + +methods: dict[str, Callable[..., Any]] = { + "set": handle_set, + "get": handle_get, +} diff --git a/pyproject.toml b/pyproject.toml index 01f56da33..9d35ea573 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ dynamic = ["version"] # via setuptools_scm "Bug Tracker" = "https://github.com/labgrid-project/labgrid/issues" [project.optional-dependencies] +dbus = ["jeepney>=0.8.0"] doc = [ "sphinx_rtd_theme>=1.0.0", "Sphinx>=2.0.0,<9.0.0", @@ -79,7 +80,7 @@ vxi11 = ["python-vxi11>=0.9"] xena = ["xenavalkyrie>=3.0.1"] deb = ["labgrid[modbus,onewire,snmp]"] dev = [ - "labgrid[doc,docker,graph,kasa,modbus,modbusrtu,mqtt,onewire,pyvisa,snmp,vxi11]", + "labgrid[dbus,doc,docker,graph,kasa,modbus,modbusrtu,mqtt,onewire,pyvisa,snmp,vxi11]", # additional dev dependencies "psutil>=5.8.0", diff --git a/tests/test_agent.py b/tests/test_agent.py index 00c8a04e8..0a701bf45 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -98,6 +98,7 @@ def test_local(): @pytest.mark.parametrize('module_name', [ 'deditec_relais8', + 'managed_gpio', 'sysfsgpio', 'usb_hid_relay' ]) @@ -112,4 +113,4 @@ def test_all_modules(module_name: str) -> None: def test_import_modules(): import labgrid.util.agents import labgrid.util.agents.dummy - from labgrid.util.agents import deditec_relais8, sysfsgpio + from labgrid.util.agents import deditec_relais8, managed_gpio, sysfsgpio