Commit 686acae0 authored by SVN-Git Migration's avatar SVN-Git Migration

Imported Upstream version 0.0.17

parent 619dd56a
......@@ -2,3 +2,4 @@
test_command=${PYTHON:-python} -m subunit.run $LISTOPT $IDOPTION testrepository.tests.test_suite
test_id_option=--load-list $IDFILE
test_list_option=--list
;filter_tags=worker-0
......@@ -29,6 +29,7 @@ for distributions such as Debian that wish to list all the copyright holders
in their metadata:
* Robert Collins <robertc@robertcollins.net>, 2009
* Hewlett-Packard Development Company, L.P., 2013
* IBM Corp., 2013
Code that has been incorporated into Testrepository from other projects will
......
......@@ -5,6 +5,59 @@ testrepository release notes
NEXT (In development)
+++++++++++++++++++++
0.0.17
++++++
CHANGES
-------
* Restore the ability to import testrepository.repository.memory on Python 2.6.
(Robert Collins)
0.0.16
++++++
CHANGES
-------
* A new testr.conf option ``group_regex`` can be used for grouping
tests so that they get run in the same backend runner. (Matthew Treinish)
* Fix Python 3.* support for entrypoints; the initial code was Python3
incompatible. (Robert Collins, Clark Boylan, #1187192)
* Switch to using multiprocessing to determine CPU counts.
(Chris Jones, #1092276)
* The cli UI now has primitive differentiation between multiple stream types.
This is not yet exposed to the end user, but is sufficient to enable the
load command to take interactive input without it reading from the raw
subunit stream on stdin. (Robert Collins)
* The scheduler can now groups tests together permitting co-dependent tests to
always be scheduled onto the same backend. Note that this does not force
co-dependent tests to be executed, so partial test runs (e.g. --failing)
may still fail. (Matthew Treinish, Robert Collins)
* When test listing fails, testr will now report an error rather than
incorrectly trying to run zero tests. A test listing failure is detected by
the returncode of the test listing process. (Robert Collins, #1185231)
0.0.15
++++++
CHANGES
-------
* Expects subunit v2 if the local library has v2 support in the subunit
library. This should be seamless if the system under test shares the
Python libraries. If it doesn't, either arrange to use ``subunit-2to1``
or upgrade the subunit libraries for the system under test.
(Robert Collins)
* ``--full-results`` is now a no-op, use ``--subunit`` to get unfiltered
output. (Robert Collins)
0.0.14
++++++
......
Metadata-Version: 1.1
Name: testrepository
Version: 0.0.14
Version: 0.0.17
Summary: A repository of test results.
Home-page: https://launchpad.net/testrepository
Author: Robert Collins
......
......@@ -50,5 +50,5 @@ the environment. This can be very useful for diagnosing problems.
Releasing
---------
Update testrepository/__init__.py version numbers. Release to pypi. Pivot the
next milestone on LP to version, and make a new next milestone.
Update NEWS and testrepository/__init__.py version numbers. Release to pypi.
Pivot the next milestone on LP to version, and make a new next milestone.
......@@ -137,8 +137,10 @@ All the normal rules for invoking test program commands apply: extra parameters
will be passed through, if a test list is being supplied test_option can be
used via $IDOPTION.
The output of the test command when this option is supplied should be a series
of test ids, in any order, ``\n`` separated on stdout.
The output of the test command when this option is supplied should be a subunit
test enumeration. For subunit v1 that is a series of test ids, in any order,
``\n`` separated on stdout. For v2 use the subunit protocol and emit one event
per test with each test having status 'exists'.
To test whether this is working the `testr list-tests` command can be useful.
......@@ -165,10 +167,10 @@ Python module to store the duration of each test. On some platforms (to date
only OSX) there is no bulk-update API and performance may be impacted if you
have many (10's of thousands) of tests.
On Linux, testrepository will inspect /proc/cpuinfo to determine how many CPUs
are present in the machine, and run one worker per CPU. On other operating
systems, or if you need to control the number of workers that are used, the
--concurrency option will let you do so::
To determine how many CPUs are present in the machine, testrepository will
use the multiprocessing Python module (present since 2.6). On operating systems
where this is not implemented, or if you need to control the number of workers
that are used, the --concurrency option will let you do so::
$ testr run --parallel --concurrency=2
......@@ -205,6 +207,25 @@ And then find tests with that tag::
$ testr last --subunit | subunit-filter -s --xfail --with-tag=worker-3 | subunit-ls > slave-3.list
Grouping Tests
~~~~~~~~~~~~~~
In certain scenarios you may want to group tests of a certain type together
so that they will be run by the same backend. The group_regex option in
.testr.conf permits this. When set, tests are grouped by the group(0) of any
regex match. Tests with no match are not grouped.
For example, extending the python sample .testr.conf from the configuration
section with a group regex that will group python tests cases together by
class (the last . splits the class and test method)::
[DEFAULT]
test_command=python -m subunit.run discover . $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list
group_regex=([^\.]+\.)+
Remote or isolated test environments
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -96,7 +96,7 @@ setup(name='testrepository',
install_requires=[
'fixtures',
'python-subunit >= 0.0.10',
'testtools >= 0.9.29',
'testtools >= 0.9.30',
],
extras_require = dict(
test=[
......
Metadata-Version: 1.1
Name: testrepository
Version: 0.0.14
Version: 0.0.17
Summary: A repository of test results.
Home-page: https://launchpad.net/testrepository
Author: Robert Collins
......
fixtures
python-subunit >= 0.0.10
testtools >= 0.9.29
testtools >= 0.9.30
[test]
bzr
......
......@@ -33,4 +33,4 @@ The tests package contains tests and test specific support code.
# established at this point, and setup.py will use a version of next-$(revno).
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
__version__ = (0, 0, 14, 'final', 0)
__version__ = (0, 0, 17, 'final', 0)
......@@ -16,10 +16,10 @@
import optparse
from testtools import MultiTestResult, TestResult
import testtools
from testtools import ExtendedToStreamDecorator, MultiTestResult
from testrepository.commands import Command
from testrepository.results import TestResultFilter
from testrepository.testcommand import TestCommand
......@@ -53,17 +53,13 @@ class failing(Command):
self.ui.output_stream(stream)
return 0
def _make_result(self, repo, list_result):
def _make_result(self, repo):
testcommand = self.command_factory(self.ui, repo)
if self.ui.options.list:
return testcommand.make_result(list_result)
list_result = testtools.StreamSummary()
return list_result, list_result
else:
output_result = self.ui.make_result(repo.latest_id, testcommand)
# This probably wants to be removed or pushed into the CLIResult
# responsibilities, it attempts to preserve skips, but the ui
# make_result filters them - a mismatch.
errors_only = TestResultFilter(output_result, filter_skip=True)
return MultiTestResult(list_result, output_result)
return self.ui.make_result(repo.latest_id, testcommand)
def run(self):
repo = self.repository_factory.open(self.ui.here)
......@@ -72,23 +68,19 @@ class failing(Command):
return self._show_subunit(run)
case = run.get_test()
failed = False
list_result = TestResult()
result = self._make_result(repo, list_result)
result, summary = self._make_result(repo)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
# XXX: This bypasses the user defined transforms, and also sets a
# non-zero return even on --list, which is inappropriate. The UI result
# knows about success/failure in more detail.
failed = not list_result.wasSuccessful()
failed = not summary.wasSuccessful()
if failed:
result = 1
else:
result = 0
if self.ui.options.list:
failing_tests = [
test for test, _ in list_result.errors + list_result.failures]
test for test, _ in summary.errors + summary.failures]
self.ui.output_tests(failing_tests)
return result
......@@ -56,14 +56,14 @@ class last(Command):
except KeyError:
previous_run = None
failed = False
result = self.ui.make_result(
result, summary = self.ui.make_result(
latest_run.get_id, testcommand, previous_run=previous_run)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
failed = not result.wasSuccessful()
failed = not summary.wasSuccessful()
if failed:
return 1
else:
......
......@@ -15,23 +15,40 @@
"""Load data into a repository."""
from functools import partial
from operator import methodcaller
import optparse
import threading
from extras import try_import
v2_avail = try_import('subunit.ByteStreamToStreamResult')
import subunit.test_results
from testtools import ConcurrentTestSuite, MultiTestResult, Tagger
import testtools
from testrepository.arguments.path import ExistingPathArgument
from testrepository.commands import Command
from testrepository.repository import RepositoryNotFound
from testrepository.testcommand import TestCommand
class InputToStreamResult(object):
"""Generate Stream events from stdin.
Really a UI responsibility?
"""
def __init__(self, stream):
self.source = stream
self.stop = False
def _wrap_result(result, thread_number):
worker_id = 'worker-%s' % thread_number
tags_to_add = set([worker_id])
tags_to_remove = set()
return subunit.test_results.AutoTimingTestResultDecorator(
Tagger(result, tags_to_add, tags_to_remove))
def run(self, result):
while True:
if self.stop:
return
char = self.source.read(1)
if not char:
return
if char == b'a':
result.status(test_id='stdin', test_status='fail')
class load(Command):
......@@ -43,7 +60,7 @@ class load(Command):
Unless the stream is a partial stream, any existing failures are discarded.
"""
input_streams = ['subunit+']
input_streams = ['subunit+', 'interactive?']
args = [ExistingPathArgument('streams', min=0, max=None)]
options = [
......@@ -57,7 +74,7 @@ class load(Command):
default=False, help="Display results in subunit format."),
optparse.Option("--full-results", action="store_true",
default=False,
help="Show all test results. Currently only works with --subunit."),
help="No-op - deprecated and kept only for backwards compat."),
]
# Can be assigned to to inject a custom command factory.
command_factory = TestCommand
......@@ -72,23 +89,44 @@ class load(Command):
else:
raise
testcommand = self.command_factory(self.ui, repo)
run_id = None
# Not a full implementation of TestCase, but we only need to iterate
# back to it. Needs to be a callable - its a head fake for
# testsuite.add.
# XXX: Be nice if we could declare that the argument, which is a path,
# is to be an input stream.
# is to be an input stream - and thus push this conditional down into
# the UI object.
if self.ui.arguments.get('streams'):
opener = partial(open, mode='rb')
cases = lambda:map(opener, self.ui.arguments['streams'])
streams = map(opener, self.ui.arguments['streams'])
else:
cases = lambda:self.ui.iter_streams('subunit')
def make_tests(suite):
streams = list(suite)[0]
for stream in streams():
yield subunit.ProtocolTestCase(stream)
case = ConcurrentTestSuite(cases, make_tests, _wrap_result)
# One copy of the stream to repository storage
streams = self.ui.iter_streams('subunit')
def make_tests():
for pos, stream in enumerate(streams):
if v2_avail:
# Calls StreamResult API.
case = subunit.ByteStreamToStreamResult(
stream, non_subunit_name='stdout')
else:
# Calls TestResult API.
case = subunit.ProtocolTestCase(stream)
def wrap_result(result):
# Wrap in a router to mask out startTestRun/stopTestRun from the
# ExtendedToStreamDecorator.
result = testtools.StreamResultRouter(
result, do_start_stop_run=False)
# Wrap that in ExtendedToStreamDecorator to convert v1 calls to
# StreamResult.
return testtools.ExtendedToStreamDecorator(result)
# Now calls StreamResult API :).
case = testtools.DecorateTestCaseResult(case, wrap_result,
methodcaller('startTestRun'),
methodcaller('stopTestRun'))
case = testtools.DecorateTestCaseResult(case,
lambda result:testtools.StreamTagger(
[result], add=['worker-%d' % pos]))
yield (case, str(pos))
case = testtools.ConcurrentStreamTestSuite(make_tests)
# One unmodified copy of the stream to repository storage
inserter = repo.get_inserter(partial=self.ui.options.partial)
# One copy of the stream to the UI layer after performing global
# filters.
......@@ -96,21 +134,27 @@ class load(Command):
previous_run = repo.get_latest_run()
except KeyError:
previous_run = None
output_result = self.ui.make_result(
lambda: run_id, testcommand, previous_run=previous_run)
result = MultiTestResult(inserter, output_result)
output_result, summary_result = self.ui.make_result(
inserter.get_id, testcommand, previous_run=previous_run)
result = testtools.CopyStreamResult([inserter, output_result])
runner_thread = None
result.startTestRun()
try:
# Convert user input into a stdin event stream
interactive_streams = list(self.ui.iter_streams('interactive'))
if interactive_streams:
case = InputToStreamResult(interactive_streams[0])
runner_thread = threading.Thread(
target=case.run, args=(result,))
runner_thread.daemon = True
runner_thread.start()
case.run(result)
finally:
# Does not call result.stopTestRun because the lambda: run_id above
# needs the local variable to be updated before the
# filtered.stopTestRun() call is invoked. This could be fixed by
# having a capturing result rather than a lambda, but thats more
# code.
run_id = inserter.stopTestRun()
output_result.stopTestRun()
if not output_result.wasSuccessful():
result.stopTestRun()
if interactive_streams and runner_thread:
runner_thread.stop = True
runner_thread.join(10)
if not summary_result.wasSuccessful():
return 1
else:
return 0
......@@ -19,8 +19,11 @@ from math import ceil
import optparse
import re
from extras import try_import
import subunit
v2_avail = try_import('subunit.ByteStreamToStreamResult')
import testtools
from testtools import (
TestResult,
TestByTestResult,
)
from testtools.compat import _b
......@@ -45,6 +48,9 @@ class ReturnCodeToSubunit(object):
synthetic test is added to the output, making the error accessible to
subunit stream consumers. If the process closes its stdout and then does
not terminate, reading from the ReturnCodeToSubunit stream will hang.
This class will be deleted at some point, allowing parsing to read from the
actual fd and benefit from select for aggregating non-subunit output.
"""
def __init__(self, process):
......@@ -65,12 +71,21 @@ class ReturnCodeToSubunit(object):
returncode = self.proc.wait()
if returncode != 0:
if self.lastoutput != LINEFEED:
# Subunit is line orientated, it has to start on a fresh line.
# Subunit V1 is line orientated, it has to start on a fresh
# line. V2 needs to start on any fresh utf8 character border
# - which is not guaranteed in an arbitrary stream endpoint, so
# injecting a \n gives us such a guarantee.
self.source.write(_b('\n'))
self.source.write(_b('test: process-returncode\n'
'error: process-returncode [\n'
' returncode %d\n'
']\n' % returncode))
if v2_avail:
stream = subunit.StreamResultToBytes(self.source)
stream.status(test_id='process-returncode', test_status='fail',
file_name='traceback', mime_type='test/plain;charset=utf8',
file_bytes=('returncode %d' % returncode).encode('utf8'))
else:
self.source.write(_b('test: process-returncode\n'
'failure: process-returncode [\n'
' returncode %d\n'
']\n' % returncode))
self.source.seek(0)
self.done = True
......@@ -121,7 +136,7 @@ class run(Command):
default=False, help="Display results in subunit format."),
optparse.Option("--full-results", action="store_true",
default=False,
help="Show all test results. Currently only works with --subunit."),
help="No-op - deprecated and kept only for backwards compat."),
optparse.Option("--until-failure", action="store_true",
default=False,
help="Repeat the run again and again until failure occurs."),
......@@ -137,14 +152,16 @@ class run(Command):
def _find_failing(self, repo):
run = repo.get_failing()
case = run.get_test()
result = TestResult()
ids = []
def gather_errors(test_dict):
if test_dict['status'] == 'fail':
ids.append(test_dict['id'])
result = testtools.StreamToDict(gather_errors)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
ids = [failure[0].id() for failure in result.failures]
ids.extend([error[0].id() for error in result.errors])
return ids
def run(self):
......@@ -223,11 +240,10 @@ class run(Command):
# check that the test we're probing still failed - still
# awkward.
found_fail = []
def find_fail(test, status, start_time, stop_time, tags,
details):
if test.id() == spurious_failure:
def find_fail(test_dict):
if test_dict['id'] == spurious_failure:
found_fail.append(True)
checker = TestByTestResult(find_fail)
checker = testtools.StreamToDict(find_fail)
checker.startTestRun()
try:
repo.get_failing().get_test().run(checker)
......@@ -271,6 +287,7 @@ class run(Command):
Tests that ran in a different worker are not included in the result.
"""
if not getattr(self, '_worker_to_test', False):
# TODO: switch to route codes?
case = run.get_test()
# Use None if there is no worker-N tag
# If there are multiple, map them all.
......@@ -278,7 +295,9 @@ class run(Command):
worker_to_test = {}
# (testid -> [workerN, ...])
test_to_worker = {}
def map_test(test, status, start_time, stop_time, tags, details):
def map_test(test_dict):
tags = test_dict['tags']
id = test_dict['id']
workers = []
for tag in tags:
if tag.startswith('worker-'):
......@@ -286,9 +305,9 @@ class run(Command):
if not workers:
workers = [None]
for worker in workers:
worker_to_test.setdefault(worker, []).append(test.id())
test_to_worker.setdefault(test.id(), []).extend(workers)
mapper = TestByTestResult(map_test)
worker_to_test.setdefault(worker, []).append(id)
test_to_worker.setdefault(id, []).extend(workers)
mapper = testtools.StreamToDict(map_test)
mapper.startTestRun()
try:
case.run(mapper)
......
......@@ -27,7 +27,7 @@ Repositories are identified by their URL, and new ones are made by calling
the initialize function in the appropriate repository module.
"""
from testtools import TestResult
from testtools import StreamToDict, TestResult
class AbstractRepositoryFactory(object):
......@@ -142,13 +142,24 @@ class AbstractRepository(object):
were part of the specified test run.
"""
run = self.get_test_run(run_id)
result = TestIDCapturer()
run.get_test().run(result)
return result.ids
ids = []
def gather(test_dict):
ids.append(test_dict['id'])
result = StreamToDict(gather)
result.startTestRun()
try:
run.get_test().run(result)
finally:
result.stopTestRun()
return ids
class AbstractTestRun(object):
"""A test run that has been stored in a repository."""
"""A test run that has been stored in a repository.
Should implement the StreamResult protocol as well
as the testrepository specific methods documented here.
"""
def get_id(self):
"""Get the id of the test run.
......@@ -166,7 +177,9 @@ class AbstractTestRun(object):
"""Get a testtools.TestCase-like object that can be run.
:return: A TestCase like object which can be run to get the individual
tests reported to a testtools.TestResult.
tests reported to a testtools.StreamResult/TestResult.
(Clients of repository should provide an ExtendedToStreamDecorator
decorator to permit either API to be used).
"""
raise NotImplementedError(self.get_test)
......@@ -178,20 +191,3 @@ class RepositoryNotFound(Exception):
self.url = url
msg = 'No repository found in %s. Create one by running "testr init".'
Exception.__init__(self, msg % url)
class TestIDCapturer(TestResult):
"""Capture the test ids from a test run.
After using the result with a test run, the ids of
the tests that were run are available in the ids
attribute.
"""
def __init__(self):
super(TestIDCapturer, self).__init__()
self.ids = []
def startTest(self, test):
super(TestIDCapturer, self).startTest(test)
self.ids.append(test.id())
......@@ -20,12 +20,14 @@ try:
except ImportError:
import dbm
import errno
from operator import methodcaller
import os.path
import sys
import tempfile
import subunit
from subunit import TestProtocolClient
import testtools
from testtools.compat import _b
from testrepository.repository import (
......@@ -189,10 +191,20 @@ class _DiskRun(AbstractTestRun):
return BytesIO(self._content)
def get_test(self):
return subunit.ProtocolTestCase(self.get_subunit_stream())
case = subunit.ProtocolTestCase(self.get_subunit_stream())
def wrap_result(result):
# Wrap in a router to mask out startTestRun/stopTestRun from the
# ExtendedToStreamDecorator.
result = testtools.StreamResultRouter(result, do_start_stop_run=False)
# Wrap that in ExtendedToStreamDecorator to convert v1 calls to
# StreamResult.
return testtools.ExtendedToStreamDecorator(result)
return testtools.DecorateTestCaseResult(
case, wrap_result, methodcaller('startTestRun'),
methodcaller('stopTestRun'))
class _SafeInserter(TestProtocolClient):
class _SafeInserter(object):
def __init__(self, repository, partial=False):
# XXX: Perhaps should factor into a decorator and use an unaltered
......@@ -206,13 +218,25 @@ class _SafeInserter(TestProtocolClient):
self._times = {}
self._test_start = None
self._time = None
TestProtocolClient.__init__(self, stream)
subunit_client = testtools.StreamToExtendedDecorator(
TestProtocolClient(stream))
self.hook = testtools.CopyStreamResult([
subunit_client,
testtools.StreamToDict(self._handle_test)])
self._stream = stream
def _handle_test(self, test_dict):
start, stop = test_dict['timestamps']
if None in (start, stop):
return
self._times[test_dict['id']] = str(timedelta_to_seconds(stop - start))
def startTestRun(self):
pass
self.hook.startTestRun()
self._run_id = None
def stopTestRun(self):
# TestProtocolClient.stopTestRun(self)
self.hook.stopTestRun()
self._stream.flush()
self._stream.close()
run_id = self._name()
......@@ -233,30 +257,18 @@ class _SafeInserter(TestProtocolClient):
db[key] = value
finally:
db.close()
return run_id
self._run_id = run_id
def status(self, *args, **kwargs):
self.hook.status(*args, **kwargs)
def _cancel(self):
"""Cancel an insertion."""
self._stream.close()
os.unlink(self.fname)
def startTest(self, test):
result = TestProtocolClient.startTest(self, test)
self._test_start = self._time
return result
def stopTest(self, test):
result = TestProtocolClient.stopTest(self, test)
if None in (self._test_start, self._time):
return result
duration_seconds = timedelta_to_seconds(self._time - self._test_start)
self._times[test.id()] = str(duration_seconds)
return result
def time(self, timestamp):
result = TestProtocolClient.time(self, timestamp)
self._time = timestamp
return result
def get_id(self):
return self._run_id
class _FailingInserter(_SafeInserter):
......@@ -272,33 +284,36 @@ class _Inserter(_SafeInserter):
return self._repository._allocate()
def stopTestRun(self):
run_id = _SafeInserter.stopTestRun(self)
super(_Inserter, self).stopTestRun()
# XXX: locking (other inserts may happen while we update the failing
# file).
# Combine failing + this run : strip passed tests, add failures.
# use memory repo to aggregate. a bit awkward on layering ;).
# Should just pull the failing items aside as they happen perhaps.
# Or use a router and avoid using a memory object at all.
from testrepository.repository import memory
repo = memory.Repository()
if self.partial:
# Seed with current failing
inserter = repo.get_inserter()