Skip to content
Commits on Source (18)
......@@ -4,3 +4,5 @@
/*.egg-info/
/rpmbuild/
/pytest-multihost-*.tar.gz
/.pytest_cache/
/.tox/
......@@ -26,8 +26,9 @@ upload-fedorahosted: tarball
scp ${TARBALLNAME} fedorahosted.org:${FEDORA_PROJECT}
upload-pypi:
python setup.py sdist upload
python setup.py bdist_wheel upload
rm -v dist/*
python3 setup.py sdist bdist_wheel
python3 -m twine upload dist/*
upload-fedorapeople: srpm
SRPMNAME=$$(ls rpmbuild/SRPMS); \
......
......@@ -4,8 +4,8 @@ A pytest plugin for multi-host testing.
Downloading
-----------
Release tarballs will be made available for download from Fedora Hosted:
https://fedorahosted.org/released/python-pytest-multihost/
Release tarballs will be made available for download from Pagure Releases:
https://pagure.io/releases/python-pytest-multihost/
The goal is to include this project in Fedora repositories. Until that happens,
you can use testing builds from COPR – see "Developer links" below.
......@@ -172,23 +172,36 @@ $ py.test --multihost-config=/path/to/configfile.yaml
To use YAML files, the PyYAML package is required. Without it only JSON files
can be used.
Encoding and bytes/text
-----------------------
When writing files or issuing commands, bytestrings are passed through
unchanged, and text strings (``unicode`` in Python 2) are encoded using
a configurable encoding (``utf-8`` by default).
When reading files, bytestrings are returned by default,
but an encoding can be given to get a test string.
For command output, separate ``stdout_bytes`` and ``stdout_text`` attributes
are provided.
The latter uses a configurable encoding (``utf-8` by default).
Contributing
------------
The project is happy to accept patches!
Please format your contribution using the FreeIPA `patch guidelines`_,
and send it to <freeipa-devel@redhat.com>.
Any development discussion is welcome there.
Someday the project might get its own list, but that seems premature now.
Please file any patches as Pull Requests on the project's `Pagure repo`_.
Any development discussion should be in Pagure Pull Requests and Issues.
Developer links
---------------
* Bug tracker: https://fedorahosted.org/python-pytest-multihost/report/3
* Code browser: https://git.fedorahosted.org/cgit/python-pytest-multihost
* git clone https://git.fedorahosted.org/git/python-pytest-multihost.git
* Bug tracker: https://pagure.io/python-pytest-multihost/issues
* Code browser: https://pagure.io/python-pytest-multihost/tree/master
* git clone https://pagure.io/python-pytest-multihost.git
* Unstable packages for Fedora: https://copr.fedoraproject.org/coprs/pviktori/pytest-plugins/
To release, update version in setup.py, add a Git tag like "v0.3",
......@@ -197,4 +210,4 @@ Running `make upload` will put the tarball to Fedora Hosted and PyPI,
and a SRPM on Fedorapeople, if you have the rights.
Running `make release` will upload and fire a COPR build.
.. _patch guidelines: http://www.freeipa.org/page/Contribute/Patch_Format
.. _Pagure repo: https://pagure.io/python-pytest-multihost
......@@ -25,7 +25,7 @@ class BaseHost(object):
See README for an overview of the core classes.
"""
transport_class = transport.SSHTransport
command_prelude = ''
command_prelude = b''
def __init__(self, domain, hostname, role, ip=None,
external_hostname=None, username=None, password=None,
......@@ -190,9 +190,9 @@ class BaseHost(object):
"""Shortcut for transport.get_file_contents"""
return self.transport.get_file_contents(filename, encoding=encoding)
def put_file_contents(self, filename, contents):
def put_file_contents(self, filename, contents, encoding='utf-8'):
"""Shortcut for transport.put_file_contents"""
self.transport.put_file_contents(filename, contents)
self.transport.put_file_contents(filename, contents, encoding=encoding)
def collect_log(self, filename):
"""Call all registered log collectors on the given filename"""
......@@ -201,7 +201,7 @@ class BaseHost(object):
def run_command(self, argv, set_env=True, stdin_text=None,
log_stdout=True, raiseonerr=True,
cwd=None):
cwd=None, bg=False, encoding='utf-8'):
"""Run the given command on this host
Returns a Command instance. The command will have already run in the
......@@ -218,43 +218,72 @@ class BaseHost(object):
:param raiseonerr: If true, an exception will be raised if the command
does not exit with return code 0
:param cwd: The working directory for the command
:param bg: If True, runs command in background.
In this case, either the result should be used in a ``with``
statement, or ``wait()`` should be called explicitly
when the command is finished.
:param encoding: Encoding for the resulting Command instance's
``stdout_text`` and ``stderr_text``, and for
``stdin_text``, ``argv``, etc. if they are not
bytestrings already.
"""
command = self.transport.start_shell(argv, log_stdout=log_stdout)
def encode(string):
if not isinstance(string, bytes):
return string.encode(encoding)
else:
return string
command = self.transport.start_shell(argv, log_stdout=log_stdout,
encoding=encoding)
# Set working directory
if cwd is None:
cwd = self.test_dir
command.stdin.write('cd %s\n' % shell_quote(cwd))
command.stdin.write(b'cd %s\n' % shell_quote(encode(cwd)))
# Set the environment
if set_env:
command.stdin.write('. %s\n' % shell_quote(self.env_sh_path))
quoted = shell_quote(encode(self.env_sh_path))
command.stdin.write(b'. %s\n' % quoted)
if self.command_prelude:
command.stdin.write(self.command_prelude)
command.stdin.write(encode(self.command_prelude))
if stdin_text:
command.stdin.write(b"echo -en ")
command.stdin.write(_echo_quote(encode(stdin_text)))
command.stdin.write(b" | ")
if isinstance(argv, basestring):
# Run a shell command given as a string
command.stdin.write('(')
command.stdin.write(argv)
command.stdin.write(')')
command.stdin.write(b'(')
command.stdin.write(encode(argv))
command.stdin.write(b')')
else:
# Run a command given as a popen-style list (no shell expansion)
for arg in argv:
command.stdin.write(shell_quote(arg))
command.stdin.write(' ')
command.stdin.write(shell_quote(encode(arg)))
command.stdin.write(b' ')
command.stdin.write(';exit\n')
if stdin_text:
command.stdin.write(stdin_text)
command.stdin.write(b'\nexit\n')
command.stdin.flush()
command.wait(raiseonerr=raiseonerr)
command.raiseonerr = raiseonerr
if not bg:
command.wait()
return command
def _echo_quote(bytestring):
"""Encode a bytestring for use with bash & "echo -en"
"""
bytestring = bytestring.replace(b"\\", br"\\")
bytestring = bytestring.replace(b"\0", br"\x00")
bytestring = bytestring.replace(b"'", br"'\''")
return b"'" + bytestring + b"'"
class Host(BaseHost):
"""A Unix host"""
command_prelude = 'set -e\n'
command_prelude = b'set -e\n'
class WinHost(BaseHost):
......
......@@ -30,6 +30,8 @@ except ImportError:
have_paramiko = False
DEFAULT = object()
class Transport(object):
"""Mechanism for communicating with remote hosts
......@@ -44,11 +46,19 @@ class Transport(object):
self._command_index = 0
def get_file_contents(self, filename, encoding=None):
"""Read the named remote file and return the contents as a string"""
"""Read the named remote file and return the contents
The string will be decoded using the given encoding;
if encoding is None (default), it will be returned as a bytestring.
"""
raise NotImplementedError('Transport.get_file_contents')
def put_file_contents(self, filename, contents):
"""Write the given string to the named remote file"""
def put_file_contents(self, filename, contents, encoding='utf-8'):
"""Write the given string (or bytestring) to the named remote file
The contents string will be encoded using the given encoding
(default: ``'utf-8'``), unless aleady a bytestring.
"""
raise NotImplementedError('Transport.put_file_contents')
def file_exists(self, filename):
......@@ -59,18 +69,22 @@ class Transport(object):
"""Make the named directory"""
raise NotImplementedError('Transport.mkdir')
def start_shell(self, argv, log_stdout=True):
def start_shell(self, argv, log_stdout=True, encoding=None):
"""Start a Shell
:param argv: The command this shell is intended to run (used for
logging only)
:param log_stdout: If false, the stdout will not be logged (useful when
binary output is expected)
:param encoding: Encoding for the resulting Command's ``stdout_text``
and ``stderr_text``.
Given a `shell` from this method, the caller can then use
``shell.stdin.write()`` to input any command(s), call ``shell.wait()``
to let the command run, and then inspect ``returncode``,
``stdout_text`` or ``stderr_text``.
Note that ``shell.stdin`` uses bytes I/O.
"""
raise NotImplementedError('Transport.start_shell')
......@@ -84,7 +98,7 @@ class Transport(object):
def get_file(self, remotepath, localpath):
"""Copy a file from the remote host to a local file"""
contents = self.get_file_contents(remotepath)
contents = self.get_file_contents(remotepath, encoding=None)
with open(localpath, 'wb') as local_file:
local_file.write(contents)
......@@ -92,7 +106,7 @@ class Transport(object):
"""Copy a local file to the remote host"""
with open(localpath, 'rb') as local_file:
contents = local_file.read()
self.put_file_contents(remotepath, contents)
self.put_file_contents(remotepath, contents, encoding=None)
def get_next_command_logger_name(self):
self._command_index += 1
......@@ -111,6 +125,27 @@ class Transport(object):
raise NotImplementedError('Transport.remove_file')
class _decoded_output_property(object):
"""Descriptor for on-demand decoding of a Command's output stream
"""
def __init__(self, name):
self.name = name
def __set_name__(self, cls, name):
# Sanity check (called only on Python 3.6+).
# This property expects to handle attributes named '<foo>_text'.
assert name == self.name + '_text'
def __get__(self, instance, cls=None):
if instance is None:
return self
else:
bytestring = getattr(instance, self.name + '_bytes')
decoded = bytestring.decode(instance.encoding)
setattr(instance, self.name + '_text', decoded)
return decoded
class Command(object):
"""A Popen-style object representing a remote command
......@@ -122,12 +157,22 @@ class Command(object):
To make sure reading doesn't stall after one buffer fills up, they are read
in parallel using threads.
After calling wait(), ``stdout_text`` and ``stderr_text`` attributes will
be strings containing the output, and ``returncode`` will contain the
After calling wait(), ``stdout_bytes`` and ``stderr_bytes`` attributes will
be bytestrings containing the output, and ``returncode`` will contain the
exit code.
The ``stdout_text`` and ``stdout_text`` will be the corresponding output
decoded using the given ``encoding`` (default: ``'utf-8'``).
These are decoded on-demand; do not access them if a command
produces binary output.
A Command may be used as a context manager (in the ``with`` statement).
Exiting the context will automatically call ``wait()``.
This raises an exception if the exit code is not 0, unless the
``raiseonerr`` attribute is set to false before exiting the context.
"""
def __init__(self, argv, logger_name=None, log_stdout=True,
get_logger=None):
get_logger=None, encoding='utf-8'):
self.returncode = None
self.argv = argv
self._done = False
......@@ -140,13 +185,24 @@ class Command(object):
get_logger = logging.getLogger
self.get_logger = get_logger
self.log = get_logger(self.logger_name)
self.encoding = encoding
self.raiseonerr = True
def wait(self, raiseonerr=True):
stdout_text = _decoded_output_property('stdout')
stderr_text = _decoded_output_property('stderr')
def wait(self, raiseonerr=DEFAULT):
"""Wait for the remote process to exit
Raises an excption if the exit code is not 0, unless raiseonerr is
Raises an exception if the exit code is not 0, unless ``raiseonerr`` is
true.
When ``raiseonerr`` is not specified as argument, the ``raiseonerr``
attribute is used.
"""
if raiseonerr is DEFAULT:
raiseonerr = self.raiseonerr
if self._done:
return self.returncode
......@@ -168,6 +224,12 @@ class Command(object):
"""
raise NotImplementedError()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.wait(raiseonerr=self.raiseonerr)
class ParamikoTransport(Transport):
"""Transport that uses the Paramiko SSH2 library"""
......@@ -220,16 +282,18 @@ class ParamikoTransport(Transport):
def get_file_contents(self, filename, encoding=None):
"""Read the named remote file and return the contents as a string"""
self.log.debug('READ %s', filename)
with self.sftp_open(filename) as f:
with self.sftp_open(filename, 'rb') as f:
result = f.read()
if encoding:
result = result.decode(encoding)
return result
def put_file_contents(self, filename, contents):
def put_file_contents(self, filename, contents, encoding=None):
"""Write the given string to the named remote file"""
self.log.info('WRITE %s', filename)
with self.sftp_open(filename, 'w') as f:
if encoding and not isinstance(contents, bytes):
contents = contents.encode(encoding)
with self.sftp_open(filename, 'wb') as f:
f.write(contents)
def file_exists(self, filename):
......@@ -248,13 +312,14 @@ class ParamikoTransport(Transport):
self.log.info('MKDIR %s', path)
self.sftp.mkdir(path)
def start_shell(self, argv, log_stdout=True):
def start_shell(self, argv, log_stdout=True, encoding='utf-8'):
logger_name = self.get_next_command_logger_name()
ssh = self._transport.open_channel('session')
self.log.info('RUN %s', argv)
return SSHCommand(ssh, argv, logger_name=logger_name,
log_stdout=log_stdout,
get_logger=self.host.config.get_logger)
get_logger=self.host.config.get_logger,
encoding=encoding)
def get_file(self, remotepath, localpath):
self.log.debug('GET %s', remotepath)
......@@ -322,12 +387,14 @@ class OpenSSHTransport(Transport):
return argv
def start_shell(self, argv, log_stdout=True):
def start_shell(self, argv, log_stdout=True, encoding='utf-8'):
self.log.info('RUN %s', argv)
command = self._run(['bash'], argv=argv, log_stdout=log_stdout)
command = self._run(['bash'], argv=argv, log_stdout=log_stdout,
encoding=encoding)
return command
def _run(self, command, log_stdout=True, argv=None, collect_output=True):
def _run(self, command, log_stdout=True, argv=None, collect_output=True,
encoding='utf-8'):
"""Run the given command on the remote host
:param command: Command to run (appended to the common SSH invocation)
......@@ -341,7 +408,8 @@ class OpenSSHTransport(Transport):
ssh = SSHCallWrapper(self.ssh_argv + list(command))
return SSHCommand(ssh, argv, logger_name, log_stdout=log_stdout,
collect_output=collect_output,
get_logger=self.host.config.get_logger)
get_logger=self.host.config.get_logger,
encoding=encoding)
def file_exists(self, path):
self.log.info('STAT %s', path)
......@@ -355,19 +423,21 @@ class OpenSSHTransport(Transport):
cmd = self._run(['mkdir', path])
cmd.wait()
def put_file_contents(self, filename, contents):
def put_file_contents(self, filename, contents, encoding='utf-8'):
self.log.info('PUT %s', filename)
if encoding and not isinstance(contents, bytes):
contents = contents.encode(encoding)
cmd = self._run(['tee', filename], log_stdout=False)
cmd.stdin.write(contents)
cmd.wait()
assert cmd.stdout_text == contents
assert cmd.stdout_bytes == contents
def get_file_contents(self, filename, encoding=None):
self.log.info('GET %s', filename)
cmd = self._run(['cat', filename], log_stdout=False)
cmd.wait(raiseonerr=False)
if cmd.returncode == 0:
result = cmd.stdout_text
result = cmd.stdout_bytes
if encoding:
result = result.decode(encoding)
return result
......@@ -419,9 +489,6 @@ class SSHCallWrapper(object):
assert mode == 'rb'
return self.command.stderr
def shutdown_write(self):
self.command.stdin.close()
def recv_exit_status(self):
return self.command.wait()
......@@ -435,7 +502,8 @@ class SSHCommand(Command):
collect_output=True, encoding='utf-8', get_logger=None):
super(SSHCommand, self).__init__(argv, logger_name,
log_stdout=log_stdout,
get_logger=get_logger)
get_logger=get_logger,
encoding=encoding)
self._stdout_lines = []
self._stderr_lines = []
self.running_threads = set()
......@@ -445,14 +513,17 @@ class SSHCommand(Command):
self.log.debug('RUN %s', argv)
self._ssh.invoke_shell()
self._use_bytes = (encoding is None)
def wrap_file(file, encoding):
if encoding is None or sys.version_info < (3, 0):
if self._use_bytes:
return file
else:
return io.TextIOWrapper(file, encoding=encoding)
stdin = self.stdin = wrap_file(self._ssh.makefile('wb'), 'utf-8')
stdout = wrap_file(self._ssh.makefile('rb'), encoding)
stderr = wrap_file(self._ssh.makefile_stderr('rb'), encoding)
self.stdin = self._ssh.makefile('wb')
stdout = self._ssh.makefile('rb')
stderr = self._ssh.makefile_stderr('rb')
if collect_output:
self._start_pipe_thread(self._stdout_lines, stdout, 'out',
......@@ -460,13 +531,14 @@ class SSHCommand(Command):
self._start_pipe_thread(self._stderr_lines, stderr, 'err', True)
def _end_process(self):
self._ssh.shutdown_write()
self.stdin.close()
while self.running_threads:
self.running_threads.pop().join()
self.stdout_text = ''.join(self._stdout_lines)
self.stderr_text = ''.join(self._stderr_lines)
self.stdout_bytes = b''.join(self._stdout_lines)
self.stderr_bytes = b''.join(self._stderr_lines)
self.returncode = self._ssh.recv_exit_status()
self._ssh.close()
......@@ -482,7 +554,8 @@ class SSHCommand(Command):
def read_stream():
for line in stream:
if do_log:
log.debug(line.rstrip('\n'))
log.debug(line.rstrip(b'\n').decode('utf-8',
errors='replace'))
result_list.append(line)
thread = threading.Thread(target=read_stream)
......@@ -491,7 +564,10 @@ class SSHCommand(Command):
return thread
if not have_paramiko or os.environ.get('PYTESTMULTIHOST_SSH_TRANSPORT') == 'openssh':
if (
not have_paramiko or
os.environ.get('PYTESTMULTIHOST_SSH_TRANSPORT') == 'openssh'
):
SSHTransport = OpenSSHTransport
else:
SSHTransport = ParamikoTransport
......@@ -15,9 +15,9 @@ def check_config_dict_empty(dct, name):
(name, ', '.join(dct)))
def shell_quote(string):
"""Quotes a string for the Bash shell"""
return "'" + string.replace("'", "'\\''") + "'"
def shell_quote(bytestring):
"""Quotes a bytestring for the Bash shell"""
return b"'" + bytestring.replace(b"'", b"'\\''") + b"'"
class TempDir(object):
......
......@@ -12,7 +12,7 @@
%global srcname pytest-multihost
%global modulename pytest_multihost
%global srcversion 1.1
%global srcversion 3.0
%global versionedname %{srcname}-%{srcversion}
Name: python-%{srcname}
......@@ -133,6 +133,17 @@ popd
%changelog
* Fri Mar 02 2018 Petr Viktorin <encukou@gmail.com> - 3.0-1
- Do not add extra newlines to stdin contents
- Remove forgotten debug print
* Wed Apr 12 2017 Petr Viktorin <encukou@gmail.com> - 2.0-1
- Add support to run commands in background
- Fix several issues around quoting, background processes, and encoding
* Wed Apr 12 2017 Petr Viktorin <encukou@gmail.com> - 1.1.1-1
- Upstream packaging changes
* Thu Apr 22 2016 Petr Viktorin <encukou@gmail.com> - 1.1-1
- Much improved support for Windows hosts (thanks to Niranjan MR)
......
......@@ -11,10 +11,10 @@ with io.open('README.rst', 'rt', encoding='utf-8') as f:
setup_args = dict(
name = "pytest-multihost",
version = "1.1",
version = "3.0",
description = "Utility for writing multi-host tests for pytest",
long_description = readme_contents,
url = "https://fedorahosted.org/python-pytest-multihost/",
url = "https://pagure.io/python-pytest-multihost",
license = "GPL",
author = "Petr Viktorin",
author_email = "pviktori@redhat.com",
......
......@@ -6,6 +6,7 @@ import getpass
import pytest
from subprocess import CalledProcessError
import contextlib
import sys
import os
import pytest_multihost
......@@ -121,6 +122,9 @@ def multihost_badpassword(request, transport_class):
def _first_command(host):
"""If managed command fails, prints a message to help debugging"""
try:
# Run dummy command first; this should catch spurious SSH messages.
host.run_command(['echo', 'hello', 'world'])
# Now, run the actual command
yield
except (AuthenticationException, CalledProcessError):
print (
......@@ -159,6 +163,38 @@ class TestLocalhost(object):
with pytest.raises(IOError):
host.get_file_contents(filename)
def test_get_put_file_contents_bytes(self, multihost, tmpdir):
host = multihost.host
filename = str(tmpdir.join('test-bytes.txt'))
testbytes = u'test \0 \N{WHITE SMILING FACE}'.encode('utf-8')
with _first_command(host):
host.put_file_contents(filename, testbytes, encoding=None)
result = host.get_file_contents(filename, encoding=None)
assert result == testbytes
@pytest.mark.parametrize('encoding', ('utf-8', 'utf-16'))
def test_put_file_contents_utf(self, multihost, tmpdir, encoding):
host = multihost.host
filename = str(tmpdir.join('test-{}.txt'.format(encoding)))
teststring = u'test \N{WHITE SMILING FACE}'
with _first_command(host):
host.put_file_contents(filename, teststring, encoding=encoding)
result = host.get_file_contents(filename, encoding=None)
assert result == teststring.encode(encoding)
with open(filename, 'rb') as f:
assert f.read() == teststring.encode(encoding)
@pytest.mark.parametrize('encoding', ('utf-8', 'utf-16'))
def test_get_file_contents_encoding(self, multihost, tmpdir, encoding):
host = multihost.host
filename = str(tmpdir.join('test-{}.txt'.format(encoding)))
teststring = u'test \N{WHITE SMILING FACE}'
with open(filename, 'wb') as f:
f.write(teststring.encode(encoding))
result = host.get_file_contents(filename, encoding=encoding)
assert result == teststring
assert type(result) == type(u'')
def test_rename_file(self, multihost, tmpdir):
host = multihost.host
filename = str(tmpdir.join('test.txt'))
......@@ -196,6 +232,108 @@ class TestLocalhost(object):
host.transport.rmdir(filename)
assert not os.path.exists(filename)
def test_escaping(self, multihost, tmpdir):
host = multihost.host
test_file_path = str(tmpdir.join('testfile.txt'))
stdin_text = '"test", test, "test", $test, '
stdin_text += ''.join(chr(x) for x in range(32, 127))
stdin_text += r', \x66\0111\x00, '
stdin_text += ''.join('\\' + chr(x) for x in range(32, 127))
tee = host.run_command(
["tee", test_file_path],
stdin_text=stdin_text,
raiseonerr=False,
)
print(tee.stderr_text)
assert tee.stdout_text == stdin_text
with open(test_file_path, "r") as f:
assert f.read() == tee.stdout_text
def test_escaping_binary(self, multihost, tmpdir):
host = multihost.host
test_file_path = str(tmpdir.join('testfile.txt'))
stdin_bytes = b'"test", test, "test", $test, '
stdin_bytes += bytes(range(0, 256))
stdin_bytes += br', \x66\0111\x00'
tee = host.run_command(
["tee", test_file_path],
stdin_text=stdin_bytes,
raiseonerr=False,
)
assert tee.stdout_bytes == stdin_bytes
with open(test_file_path, "rb") as f:
assert f.read() == tee.stdout_bytes
def test_piping_input(self, multihost, tmpdir):
host = multihost.host
b64 = host.run_command(['base64'], stdin_text='test')
assert b64.stdout_text == 'dGVzdA==' + '\n'
def test_piping_input_with_newline(self, multihost, tmpdir):
host = multihost.host
b64 = host.run_command(['base64'], stdin_text='test\n')
assert b64.stdout_text == 'dGVzdAo=' + '\n'
def test_background_explicit_wait(self, multihost, tmpdir):
host = multihost.host
pipe_filename = str(tmpdir.join('test.pipe'))
with _first_command(host):
host.run_command(['mkfifo', pipe_filename])
cat = host.run_command(['cat', pipe_filename], bg=True)
host.run_command('cat > ' + pipe_filename, stdin_text='expected value')
cat.wait()
assert cat.stdout_text == 'expected value'
assert cat.returncode == 0
def test_background_context(self, multihost, tmpdir):
host = multihost.host
pipe_filename = str(tmpdir.join('test.pipe'))
with _first_command(host):
host.run_command(['mkfifo', pipe_filename])
with host.run_command(['cat', pipe_filename], bg=True) as cat:
host.run_command('cat > ' + pipe_filename,
stdin_text='expected value')
assert cat.stdout_text == 'expected value'
assert cat.returncode == 0
def test_background_raiseonerr_false(self, multihost, tmpdir):
host = multihost.host
with _first_command(host):
false = host.run_command(['false'], raiseonerr=False, bg=True)
assert false.returncode != 0
def test_background_raiseonerr_with(self, multihost, tmpdir):
host = multihost.host
with _first_command(host):
with pytest.raises(CalledProcessError):
with host.run_command(['false'], raiseonerr=True, bg=True):
pass
def test_background_raiseonerr_wait(self, multihost, tmpdir):
host = multihost.host
with _first_command(host):
false = host.run_command(['false'], raiseonerr=True, bg=True)
with pytest.raises(CalledProcessError):
false.wait()
@pytest.mark.needs_ssh
class TestLocalhostBadConnection(object):
def test_reset(self, multihost):
host = multihost.host
with _first_command(host):
......
# Tox (http://tox.testrun.org/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py2,py36
minver = 1.8
[testenv]
deps =
pytest
paramiko
commands = python -m pytest -vv test_pytestmultihost/