subunithelper.py 18.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Python module for parsing and generating the Subunit protocol
# (Samba-specific)
# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

__all__ = ['parse_results']

import re
21
import sys
22
import subunit
23
import subunit.iso8601
24
import testtools
25 26 27

VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']

28 29 30 31 32 33
class TestsuiteEnabledTestResult(testtools.testresult.TestResult):

    def start_testsuite(self, name):
        raise NotImplementedError(self.start_testsuite)


34 35
def parse_results(msg_ops, statistics, fh):
    expected_fail = 0
36
    open_tests = {}
37 38 39

    while fh:
        l = fh.readline()
40 41
        if l == "":
            break
42 43
        parts = l.split(None, 1)
        if not len(parts) == 2 or not l.startswith(parts[0]):
44
            msg_ops.output_msg(l)
45 46 47 48 49
            continue
        command = parts[0].rstrip(":")
        arg = parts[1]
        if command in ("test", "testing"):
            msg_ops.control_msg(l)
50 51 52
            name = arg.rstrip()
            test = subunit.RemotedTestCase(name)
            if name in open_tests:
53
                msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
54 55
            msg_ops.startTest(test)
            open_tests[name] = test
56
        elif command == "time":
57
            msg_ops.control_msg(l)
58 59 60 61 62 63
            try:
                dt = subunit.iso8601.parse_date(arg.rstrip("\n"))
            except TypeError, e:
                print "Unable to parse time line: %s" % arg.rstrip("\n")
            else:
                msg_ops.time(dt)
64
        elif command in VALID_RESULTS:
65
            msg_ops.control_msg(l)
66 67 68
            result = command
            grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
            (testname, hasreason) = (grp.group(1), grp.group(2))
69 70 71 72 73 74
            if hasreason:
                reason = ""
                # reason may be specified in next lines
                terminated = False
                while fh:
                    l = fh.readline()
75 76
                    if l == "":
                        break
77 78 79 80 81 82
                    msg_ops.control_msg(l)
                    if l == "]\n":
                        terminated = True
                        break
                    else:
                        reason += l
83

84 85
                remote_error = subunit.RemoteError(reason.decode("utf-8"))

86 87
                if not terminated:
                    statistics['TESTS_ERROR']+=1
88
                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
89
                    return 1
90 91
            else:
                reason = None
92
                remote_error = subunit.RemoteError(u"No reason specified")
93
            if result in ("success", "successful"):
94
                try:
95 96
                    test = open_tests.pop(testname)
                except KeyError:
97
                    statistics['TESTS_ERROR']+=1
98
                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
99 100
                else:
                    statistics['TESTS_EXPECTED_OK']+=1
101
                    msg_ops.addSuccess(test)
102
            elif result in ("xfail", "knownfail"):
103
                try:
104 105
                    test = open_tests.pop(testname)
                except KeyError:
106
                    statistics['TESTS_ERROR']+=1
107
                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
108 109
                else:
                    statistics['TESTS_EXPECTED_FAIL']+=1
110
                    msg_ops.addExpectedFailure(test, remote_error)
111
                    expected_fail+=1
112
            elif result in ("failure", "fail"):
113
                try:
114 115
                    test = open_tests.pop(testname)
                except KeyError:
116
                    statistics['TESTS_ERROR']+=1
117
                    msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
118 119
                else:
                    statistics['TESTS_UNEXPECTED_FAIL']+=1
120
                    msg_ops.addFailure(test, remote_error)
121 122 123
            elif result == "skip":
                statistics['TESTS_SKIP']+=1
                # Allow tests to be skipped without prior announcement of test
124 125 126 127 128
                try:
                    test = open_tests.pop(testname)
                except KeyError:
                    test = subunit.RemotedTestCase(testname)
                msg_ops.addSkip(test, reason)
129 130
            elif result == "error":
                statistics['TESTS_ERROR']+=1
131
                try:
132 133 134
                    test = open_tests.pop(testname)
                except KeyError:
                    test = subunit.RemotedTestCase(testname)
135
                msg_ops.addError(test, remote_error)
136 137 138 139 140 141 142 143 144 145
            elif result == "skip-testsuite":
                msg_ops.skip_testsuite(testname)
            elif result == "testsuite-success":
                msg_ops.end_testsuite(testname, "success", reason)
            elif result == "testsuite-failure":
                msg_ops.end_testsuite(testname, "failure", reason)
            elif result == "testsuite-xfail":
                msg_ops.end_testsuite(testname, "xfail", reason)
            elif result == "testsuite-error":
                msg_ops.end_testsuite(testname, "error", reason)
146 147 148
            else:
                raise AssertionError("Recognized but unhandled result %r" %
                    result)
149 150 151 152
        elif command == "testsuite":
            msg_ops.start_testsuite(arg.strip())
        elif command == "progress":
            arg = arg.strip()
153 154 155 156 157 158 159 160
            if arg == "pop":
                msg_ops.progress(None, subunit.PROGRESS_POP)
            elif arg == "push":
                msg_ops.progress(None, subunit.PROGRESS_PUSH)
            elif arg[0] in '+-':
                msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
            else:
                msg_ops.progress(int(arg), subunit.PROGRESS_SET)
161 162 163 164
        else:
            msg_ops.output_msg(l)

    while open_tests:
165
        test = subunit.RemotedTestCase(open_tests.popitem()[1])
166
        msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
167 168 169 170 171
        statistics['TESTS_ERROR']+=1

    if statistics['TESTS_ERROR'] > 0:
        return 1
    if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
172
        return 1
173 174 175
    return 0


176
class SubunitOps(subunit.TestProtocolClient,TestsuiteEnabledTestResult):
177

178 179
    # The following are Samba extensions:
    def start_testsuite(self, name):
180
        self._stream.write("testsuite: %s\n" % name)
181

182 183
    def skip_testsuite(self, name, reason=None):
        if reason:
184
            self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
185
        else:
186
            self._stream.write("skip-testsuite: %s\n" % name)
187

188 189
    def end_testsuite(self, name, result, reason=None):
        if reason:
190
            self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
191
        else:
192
            self._stream.write("testsuite-%s: %s\n" % (result, name))
193

194 195 196
    def output_msg(self, msg):
        self._stream.write(msg)

197 198

def read_test_regexes(name):
199
    ret = {}
200 201 202 203 204 205 206 207
    f = open(name, 'r')
    try:
        for l in f:
            l = l.strip()
            if l == "" or l[0] == "#":
                continue
            if "#" in l:
                (regex, reason) = l.split("#", 1)
208
                ret[regex.strip()] = reason.strip()
209
            else:
210
                ret[l] = None
211 212
    finally:
        f.close()
213
    return ret
214 215 216


def find_in_list(regexes, fullname):
217
    for regex, reason in regexes.iteritems():
218 219 220 221 222 223 224
        if re.match(regex, fullname):
            if reason is None:
                return ""
            return reason
    return None


225 226 227 228 229 230 231
class ImmediateFail(Exception):
    """Raised to abort immediately."""

    def __init__(self):
        super(ImmediateFail, self).__init__("test failed and fail_immediately set")


232
class FilterOps(testtools.testresult.TestResult):
233 234 235 236

    def control_msg(self, msg):
        pass # We regenerate control messages, so ignore this

237 238
    def time(self, time):
        self._ops.time(time)
239

240 241 242
    def progress(self, delta, whence):
        self._ops.progress(delta, whence)

243 244 245 246 247 248
    def output_msg(self, msg):
        if self.output is None:
            sys.stdout.write(msg)
        else:
            self.output+=msg

249
    def startTest(self, test):
250
        self.seen_output = True
251
        test = self._add_prefix(test)
252 253 254
        if self.strip_ok_output:
           self.output = ""

255
        self._ops.startTest(test)
256

257 258 259 260 261 262
    def _add_prefix(self, test):
        if self.prefix is not None:
            return subunit.RemotedTestCase(self.prefix + test.id())
        else:
            return test

263
    def addError(self, test, details=None):
264
        test = self._add_prefix(test)
265 266 267 268
        self.error_added+=1
        self.total_error+=1
        self._ops.addError(test, details)
        self.output = None
269
        if self.fail_immediately:
270
            raise ImmediateFail()
271

272
    def addSkip(self, test, details=None):
273
        self.seen_output = True
274
        test = self._add_prefix(test)
275 276
        self._ops.addSkip(test, details)
        self.output = None
277

278
    def addExpectedFailure(self, test, details=None):
279
        test = self._add_prefix(test)
280
        self._ops.addExpectedFailure(test, details)
281
        self.output = None
282 283 284

    def addFailure(self, test, details=None):
        test = self._add_prefix(test)
285 286
        xfail_reason = find_in_list(self.expected_failures, test.id())
        if xfail_reason is not None:
287 288
            self.xfail_added+=1
            self.total_xfail+=1
289
            if details is not None:
290
                details = subunit.RemoteError(unicode(details[1]) + xfail_reason.decode("utf-8"))
291
            else:
292 293
                details = subunit.RemoteError(xfail_reason.decode("utf-8"))
            self._ops.addExpectedFailure(test, details)
294
        else:
295 296
            self.fail_added+=1
            self.total_fail+=1
297 298 299
            self._ops.addFailure(test, details)
            if self.output:
                self._ops.output_msg(self.output)
300 301
            if self.fail_immediately:
                raise ImmediateFail()
302 303
        self.output = None

304 305 306 307
    def addSuccess(self, test, details=None):
        test = self._add_prefix(test)
        self._ops.addSuccess(test, details)
        self.output = None
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

    def skip_testsuite(self, name, reason=None):
        self._ops.skip_testsuite(name, reason)

    def start_testsuite(self, name):
        self._ops.start_testsuite(name)
        self.error_added = 0
        self.fail_added = 0
        self.xfail_added = 0

    def end_testsuite(self, name, result, reason=None):
        xfail = False

        if self.xfail_added > 0:
            xfail = True
        if self.fail_added > 0 or self.error_added > 0:
            xfail = False

        if xfail and result in ("fail", "failure"):
            result = "xfail"

        if self.fail_added > 0 and result != "failure":
            result = "failure"
            if reason is None:
                reason = "Subunit/Filter Reason"
            reason += "\n failures[%d]" % self.fail_added

        if self.error_added > 0 and result != "error":
            result = "error"
            if reason is None:
                reason = "Subunit/Filter Reason"
            reason += "\n errors[%d]" % self.error_added

        self._ops.end_testsuite(name, result, reason)

343 344
    def __init__(self, out, prefix=None, expected_failures=None,
                 strip_ok_output=False, fail_immediately=False):
345
        self._ops = out
346
        self.seen_output = False
347 348
        self.output = None
        self.prefix = prefix
349 350 351 352
        if expected_failures is not None:
            self.expected_failures = expected_failures
        else:
            self.expected_failures = {}
353 354
        self.strip_ok_output = strip_ok_output
        self.xfail_added = 0
355
        self.fail_added = 0
356 357 358
        self.total_xfail = 0
        self.total_error = 0
        self.total_fail = 0
359
        self.error_added = 0
360
        self.fail_immediately = fail_immediately
361 362 363 364


class PlainFormatter(TestsuiteEnabledTestResult):

365
    def __init__(self, verbose, immediate, statistics,
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
            totaltests=None):
        super(PlainFormatter, self).__init__()
        self.verbose = verbose
        self.immediate = immediate
        self.statistics = statistics
        self.start_time = None
        self.test_output = {}
        self.suitesfailed = []
        self.suites_ok = 0
        self.skips = {}
        self.index = 0
        self.name = None
        self._progress_level = 0
        self.totalsuites = totaltests
        self.last_time = None

382
    @staticmethod
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
    def _format_time(delta):
        minutes, seconds = divmod(delta.seconds, 60)
        hours, minutes = divmod(minutes, 60)
        ret = ""
        if hours:
            ret += "%dh" % hours
        if minutes:
            ret += "%dm" % minutes
        ret += "%ds" % seconds
        return ret

    def progress(self, offset, whence):
        if whence == subunit.PROGRESS_POP:
            self._progress_level -= 1
        elif whence == subunit.PROGRESS_PUSH:
            self._progress_level += 1
        elif whence == subunit.PROGRESS_SET:
            if self._progress_level == 0:
                self.totalsuites = offset
        elif whence == subunit.PROGRESS_CUR:
            raise NotImplementedError

    def time(self, dt):
        if self.start_time is None:
            self.start_time = dt
        self.last_time = dt

    def start_testsuite(self, name):
        self.index += 1
        self.name = name

        if not self.verbose:
            self.test_output[name] = ""

        out = "[%d" % self.index
        if self.totalsuites is not None:
            out += "/%d" % self.totalsuites
        if self.start_time is not None:
            out += " in " + self._format_time(self.last_time - self.start_time)
        if self.suitesfailed:
            out += ", %d errors" % (len(self.suitesfailed),)
        out += "] %s" % name
        if self.immediate:
            sys.stdout.write(out + "\n")
        else:
            sys.stdout.write(out + ": ")

    def output_msg(self, output):
        if self.verbose:
            sys.stdout.write(output)
        elif self.name is not None:
            self.test_output[self.name] += output
        else:
            sys.stdout.write(output)

    def control_msg(self, output):
        pass

    def end_testsuite(self, name, result, reason):
        out = ""
        unexpected = False

        if not name in self.test_output:
            print "no output for name[%s]" % name

        if result in ("success", "xfail"):
            self.suites_ok+=1
        else:
            self.output_msg("ERROR: Testsuite[%s]\n" % name)
            if reason is not None:
                self.output_msg("REASON: %s\n" % (reason,))
            self.suitesfailed.append(name)
            if self.immediate and not self.verbose and name in self.test_output:
                out += self.test_output[name]
            unexpected = True

        if not self.immediate:
            if not unexpected:
                out += " ok\n"
            else:
                out += " " + result.upper() + "\n"

        sys.stdout.write(out)

    def startTest(self, test):
        pass

    def addSuccess(self, test):
        self.end_test(test.id(), "success", False)

    def addError(self, test, details=None):
        self.end_test(test.id(), "error", True, details)

    def addFailure(self, test, details=None):
        self.end_test(test.id(), "failure", True, details)

    def addSkip(self, test, details=None):
        self.end_test(test.id(), "skip", False, details)

    def addExpectedFail(self, test, details=None):
        self.end_test(test.id(), "xfail", False, details)

    def end_test(self, testname, result, unexpected, reason=None):
        if not unexpected:
            self.test_output[self.name] = ""
            if not self.immediate:
                sys.stdout.write({
                    'failure': 'f',
                    'xfail': 'X',
                    'skip': 's',
                    'success': '.'}.get(result, "?(%s)" % result))
            return

        if not self.name in self.test_output:
            self.test_output[self.name] = ""

        self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
        if reason is not None:
501
            self.test_output[self.name] += "REASON: %s\n" % (unicode(reason[1]).encode("utf-8").strip(),)
502 503 504 505 506 507 508 509 510 511 512

        if self.immediate and not self.verbose:
            print self.test_output[self.name]
            self.test_output[self.name] = ""

        if not self.immediate:
            sys.stdout.write({
               'error': 'E',
               'failure': 'F',
               'success': 'S'}.get(result, "?"))

513 514
    def write_summary(self, path):
        f = open(path, 'w+')
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

        if self.suitesfailed:
            f.write("= Failed tests =\n")

            for suite in self.suitesfailed:
                f.write("== %s ==\n" % suite)
                if suite in self.test_output:
                    f.write(self.test_output[suite]+"\n\n")

            f.write("\n")

        if not self.immediate and not self.verbose:
            for suite in self.suitesfailed:
                print "=" * 78
                print "FAIL: %s" % suite
                if suite in self.test_output:
                    print self.test_output[suite]
                print ""

        f.write("= Skipped tests =\n")
        for reason in self.skips.keys():
            f.write(reason + "\n")
            for name in self.skips[reason]:
                f.write("\t%s\n" % name)
            f.write("\n")
        f.close()

        if (not self.suitesfailed and
            not self.statistics['TESTS_UNEXPECTED_FAIL'] and
            not self.statistics['TESTS_ERROR']):
            ok = (self.statistics['TESTS_EXPECTED_OK'] +
                  self.statistics['TESTS_EXPECTED_FAIL'])
            print "\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok)
        else:
            print "\nFAILED (%d failures and %d errors in %d testsuites)" % (
                self.statistics['TESTS_UNEXPECTED_FAIL'],
                self.statistics['TESTS_ERROR'],
                len(self.suitesfailed))

    def skip_testsuite(self, name, reason="UNKNOWN"):
        self.skips.setdefault(reason, []).append(name)
        if self.totalsuites:
            self.totalsuites-=1