Skip to content
Open
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ New Features in 0.4.0
a jumper.
- AndroidFastbootDriver now supports booting/flashing images preconfigured in
the environment configuration.
- CommandProtocol now supports a start_process() method to run a process on the
target asynchronously.

Bug fixes in 0.4.0
~~~~~~~~~~~~~~~~~~
Expand Down
9 changes: 0 additions & 9 deletions doc/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -783,12 +783,3 @@ By writing these events to a file (or sqlite database) as a trace, we can
collect data over multiple runs for later analysis.
This would become more useful by passing recognized events (stack traces,
crashes, ...) and benchmark results via the Step infrastructure.

CommandProtocol Support for Background Processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Currently the CommandProtocol does not support long running
processes well.
An implementation should start a new process,
return a handle and forbid running other processes in the foreground.
The handle can be used to retrieve output from a command.
68 changes: 45 additions & 23 deletions labgrid/driver/bareboxdriver.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import shlex
from contextlib import contextmanager

import attr
from pexpect import TIMEOUT

from ..exceptions import CommandProcessBusy
from ..factory import target_factory
from ..protocol import CommandProtocol, ConsoleProtocol, LinuxBootProtocol
from ..step import step
from ..util import gen_marker, Timeout, re_vt100
from ..util import gen_marker, Timeout, re_vt100, ConsoleMarkerProcess
from .common import Driver
from .commandmixin import CommandMixin

Expand Down Expand Up @@ -44,6 +46,7 @@ def __attrs_post_init__(self):
self._status = 0
# barebox' default log level, used as fallback if no log level can be saved
self.saved_log_level = 7
self._process = None

def on_activate(self):
"""Activate the BareboxDriver
Expand All @@ -59,6 +62,34 @@ def on_deactivate(self):
Simply sets the internal status to 0
"""
self._status = 0
assert not self._process, "Deactivating while a command process is running is not allowed"

@contextmanager
def _start_process(self, cmd: str, *, adjust_log_level: bool = True):
if self._process is not None:
raise CommandProcessBusy()

# FIXME: use codec, decodeerrors
marker = gen_marker()
# hide marker from expect
hidden_marker = f'"{marker[:4]}""{marker[4:]}"'
# generate command with marker and log level adjustment
cmp_command = f'echo -o /cmd {shlex.quote(cmd)}; echo {hidden_marker};'
if self.saved_log_level and adjust_log_level:
cmp_command += f' global.loglevel={self.saved_log_level};'
cmp_command += f' sh /cmd; echo {hidden_marker} $?;'
if self.saved_log_level and adjust_log_level:
cmp_command += ' global.loglevel=0;'

self.console.sendline(cmp_command)
self.console.expect(marker)

with ConsoleMarkerProcess(self.console, marker, self.prompt) as p:
self._process = p
try:
yield p
finally:
self._process = None

@Driver.check_active
@step(args=['cmd'])
Expand All @@ -76,32 +107,23 @@ def _run(self, cmd: str, *, timeout: int = 30, adjust_log_level: bool = True, co
Returns:
Tuple[List[str],List[str], int]: if successful, None otherwise
"""
# FIXME: use codec, decodeerrors
marker = gen_marker()
# hide marker from expect
hidden_marker = f'"{marker[:4]}""{marker[4:]}"'
# generate command with marker and log level adjustment
cmp_command = f'echo -o /cmd {shlex.quote(cmd)}; echo {hidden_marker};'
if self.saved_log_level and adjust_log_level:
cmp_command += f' global.loglevel={self.saved_log_level};'
cmp_command += f' sh /cmd; echo {hidden_marker} $?;'
if self.saved_log_level and adjust_log_level:
cmp_command += ' global.loglevel=0;'

if self._status == 1:
self.console.sendline(cmp_command)
_, _, match, _ = self.console.expect(
rf'{marker}(.*){marker}\s+(\d+)\s+.*{self.prompt}',
timeout=timeout)
# Remove VT100 Codes and split by newline
data = re_vt100.sub('', match.group(1).decode('utf-8')).split('\r\n')[1:-1]
self.logger.debug("Received Data: %s", data)
# Get exit code
exitcode = int(match.group(2))
return (data, [], exitcode)
with self._start_process(cmd, adjust_log_level=adjust_log_level) as p:
output = p.read_to_end(timeout=timeout)
# Remove VT100 Codes and split by newline
data = re_vt100.sub('', output.decode('utf-8')).split('\r\n')[1:-1]
self.logger.debug("Received Data: %s", data)
return (data, [], p.exitcode)

return None

@Driver.check_active
@step(args=['cmd'])
@contextmanager
def start_process(self, cmd: str):
with self._start_process(cmd) as p:
yield p

@Driver.check_active
@step()
def reset(self):
Expand Down
8 changes: 8 additions & 0 deletions labgrid/driver/consoleexpectmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def sendline(self, line):
def sendcontrol(self, char):
self._expect.sendcontrol(char)

@Driver.check_active
def expect_no_step(self, pattern, timeout=-1):
"""
Expect a pattern without logging the output.
"""
index = self._expect.expect(pattern, timeout=timeout)
return index, self._expect.before, self._expect.match, self._expect.after

@Driver.check_active
@step(args=['pattern'], result=True)
def expect(self, pattern, timeout=-1):
Expand Down
61 changes: 42 additions & 19 deletions labgrid/driver/shelldriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import re
import shlex
import ipaddress
from contextlib import contextmanager

import attr
from pexpect import TIMEOUT
import xmodem

from ..exceptions import CommandProcessBusy
from ..factory import target_factory
from ..protocol import CommandProtocol, ConsoleProtocol, FileTransferProtocol
from ..step import step
from ..util import gen_marker, Timeout, re_vt100
from ..util import gen_marker, Timeout, re_vt100, ConsoleMarkerProcess
from .commandmixin import CommandMixin
from .common import Driver
from .exception import ExecutionError
Expand Down Expand Up @@ -61,6 +63,7 @@ def __attrs_post_init__(self):

self._xmodem_cached_rx_cmd = ""
self._xmodem_cached_sx_cmd = ""
self._process = None

def on_activate(self):
if self._status == 0:
Expand All @@ -76,38 +79,58 @@ def on_activate(self):

def on_deactivate(self):
self._status = 0
assert not self._process, "Deactivating while a command process is running is not allowed"

def _run(self, cmd, *, timeout=30.0, codec="utf-8", decodeerrors="strict"):
"""
Runs the specified cmd on the shell and returns the output.
@contextmanager
def _start_process(self, cmd: str):
if self._process is not None:
raise CommandProcessBusy()

Arguments:
cmd - cmd to run on the shell
"""
# FIXME: Handle pexpect Timeout
self._check_prompt()
marker = gen_marker()

# hide marker from expect
cmp_command = f'''MARKER='{marker[:4]}''{marker[4:]}' run {shlex.quote(cmd)}'''
self.console.sendline(cmp_command)
_, _, match, _ = self.console.expect(
rf'{marker}(.*){marker}\s+(\d+)\s+{self.prompt}',
timeout=timeout
)
# Remove VT100 Codes, split by newline and remove surrounding newline
data = re_vt100.sub('', match.group(1).decode(codec, decodeerrors)).split('\r\n')
if data and not data[-1]:
del data[-1]
self.logger.debug("Received Data: %s", data)
# Get exit code
exitcode = int(match.group(2))
return (data, [], exitcode)
self.console.expect(marker)
with ConsoleMarkerProcess(self.console, marker, self.prompt) as p:
self._process = p
try:
yield p
finally:
self._process = None

def _run(self, cmd, *, timeout=30.0, codec="utf-8", decodeerrors="strict"):
"""
Runs the specified cmd on the shell and returns the output.

Arguments:
cmd - cmd to run on the shell
"""
with self._start_process(cmd) as p:
output = p.read_to_end(timeout=timeout)

# Remove VT100 Codes, split by newline and remove surrounding newline
data = re_vt100.sub('', output.decode(codec, decodeerrors)).split('\r\n')
if data and not data[-1]:
del data[-1]

self.logger.debug("Received Data: %s", data)
return (data, [], p.exitcode)

@Driver.check_active
@step(args=['cmd'], result=True)
def run(self, cmd, timeout=30.0, codec="utf-8", decodeerrors="strict"):
return self._run(cmd, timeout=timeout, codec=codec, decodeerrors=decodeerrors)

@Driver.check_active
@step(args=['cmd'])
@contextmanager
def start_process(self, cmd: str):
with self._start_process(cmd) as p:
yield p

@step()
def _await_login(self):
"""Awaits the login prompt and logs the user in"""
Expand Down
87 changes: 85 additions & 2 deletions labgrid/driver/sshdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import tempfile
import time
from functools import cached_property
from contextlib import contextmanager

import attr
import pexpect

from ..factory import target_factory
from ..protocol import CommandProtocol, FileTransferProtocol
from ..protocol import CommandProtocol, CommandProcessProtocol, FileTransferProtocol
from .commandmixin import CommandMixin
from .common import Driver
from ..step import step
Expand All @@ -22,6 +24,56 @@
from ..util.proxy import proxymanager
from ..util.timeout import Timeout
from ..util.ssh import get_ssh_connect_timeout
from ..util.readmixin import ReadMixIn


class SSHDriverProcess(CommandProcessProtocol, ReadMixIn):
def __init__(self, sub):
self._sub = sub

@property
def exitcode(self):
if self._sub.isalive():
return None

if self._sub.exitstatus is None:
return -self._sub.signalstatus
return self._sub.exitstatus

def read(self, size=1, timeout=-1):
return self._sub.read_nonblocking(size, timeout)

@step(args=['data'])
def write(self, data):
self._sub.write(data)

@step(result=True)
def poll(self):
return self.exitcode

@step(result=True)
def stop(self):
self._sub.close(True)

@step(args=['pattern', 'timeout'], result=True)
def expect(self, pattern, *, timeout=-1):
index = self._sub.expect(pattern, timeout=timeout)
return index, self._sub.before, self._sub.match, self._sub.after

@step(result=True)
def wait(self):
return self._sub.wait()

@step(args=['char'])
def sendcontrol(self, char):
self._sub.sendcontrol(char)

def __enter__(self):
return self

def __exit__(self, typ, value, traceback):
self.stop()
return False


@target_factory.reg_driver
Expand All @@ -45,6 +97,7 @@ def __attrs_post_init__(self):
self._scp = self._get_tool("scp")
self._sshfs = self._get_tool("sshfs")
self._rsync = self._get_tool("rsync")
self._processes = []

def _get_tool(self, name):
if self.target.env:
Expand Down Expand Up @@ -80,6 +133,7 @@ def on_activate(self):
self._start_keepalive()

def on_deactivate(self):
assert not self._processes, "Deactivating while a command process is running is not allowed"
try:
self._stop_keepalive()
finally:
Expand Down Expand Up @@ -242,6 +296,35 @@ def _run(self, cmd, codec="utf-8", decodeerrors="strict", timeout=None):
stderr.pop()
return (stdout, stderr, sub.returncode)

@Driver.check_active
@step(args=['cmd'])
@contextmanager
def start_process(self, cmd: str):
if not self._check_keepalive():
raise ExecutionError("Keepalive no longer running")

cmd = f"stty -echo; {cmd}" # Disable input echo from ssh
complete_cmd = ["ssh", "-o", "LogLevel=QUIET", "-x", *self.ssh_prefix,
"-p", str(self.networkservice.port), "-l", self.networkservice.username,
self.networkservice.address, "-tt", "--", '/bin/sh -c {}'.format(shlex.quote(cmd)),
]
self.logger.debug("Sending command: %s", complete_cmd)

try:
sub = pexpect.spawn(complete_cmd[0], complete_cmd[1:])
sub.setecho(False) # Disable input echo from pexpect
except:
raise ExecutionError(
"error executing command: {}".format(complete_cmd)
)

with SSHDriverProcess(sub) as p:
self._processes.append(p)
try:
yield p
finally:
self._processes.remove(p)

def interact(self, cmd=None):
assert cmd is None or isinstance(cmd, list)

Expand Down Expand Up @@ -377,7 +460,7 @@ def scp(self, *, src, dst):
"-o", f"ControlPath={self.control.replace('%', '%%')}",
src, dst,
]

if self.explicit_sftp_mode and self._scp_supports_explicit_sftp_mode():
complete_cmd.insert(1, "-s")
if self.explicit_scp_mode and self._scp_supports_explicit_scp_mode():
Expand Down
Loading
Loading