Created
January 17, 2012 13:48
-
-
Save bonzini/1626679 to your computer and use it in GitHub Desktop.
Integrating PyUnit and gtester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# | |
# GTester-compatible main program for pyunit | |
# | |
# Copyright (C) 2011 Red Hat, Inc. | |
# Author: Paolo Bonzini <[email protected]> | |
# | |
# Permission to use, copy, modify, and/or distribute this software for any | |
# purpose with or without fee is hereby granted, provided that the above | |
# copyright notice and this permission notice appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | |
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# This file defines replacements for unittest.main and unittest.TextTestRunner. | |
# They are command-line compatible with glib test suites, including gtester | |
# support, and even in standalone mode the output is similar to gtest-based | |
# C unit tests. The only unsupported features are -m and --GTestSkipCount. | |
# | |
# Use this instead of unittest.main: | |
# | |
# if __name__ == "__main__": | |
# gtest_main.main() # run all tests | |
import sys | |
import os | |
import types | |
import fnmatch | |
import random | |
import re | |
import struct | |
import time | |
import traceback | |
import unittest | |
from unittest.signals import registerResult, installHandler | |
# Ugly implementation dependency... used by TestResult to skip internal | |
# frames in the traceback | |
__unittest = True | |
def test_id(test, module='__main__'): | |
"""Retrieve a GTest-like path from a TestCase.""" | |
id = test.id().replace('.', '/') | |
if module is not None: | |
id = id[len(module.__name__):] | |
return id | |
class SelectTestsSuite(unittest.TestSuite): | |
"""A wrapper for other TestSuites that processes -p and -s options. | |
Perhaps it could also use decorators to implement -m?""" | |
re = None | |
module = None | |
def __init__(self, tests=(), module='__main__', path='*', skipPath=None): | |
super(unittest.TestSuite, self).__init__(tests=tests) | |
self.module = module | |
regex = '' | |
if path != '*': | |
regex = fnmatch.translate(path) | |
if skipPath is not None: | |
regex = '(?!' + fnmatch.translate(skipPath) + ')' + regex | |
if regex != '': | |
self.re = re.compile(regex) | |
def __iter__(self): | |
iter = super(unittest.TestSuite, self).__iter__() | |
if self.re is not None: | |
iter = (x for x in iter if self.accept(x)) | |
return iter | |
def addTest(self, test): | |
if isinstance(test, unittest.TestSuite): | |
self.addTests(test) | |
else: | |
unittest.TestSuite.addTest(self, test) | |
def accept(self, test): | |
id = test_id(test, self.module) | |
return self.re is None or self.re.match(id) | |
class GTestResult(unittest.TestResult): | |
"""A test result class that can print formatted text results to a stream | |
and can talk to gtester using the glib test protocol. Roughly based on | |
TextTestResult, used internally by instances of GAbstractTestRunner.""" | |
separator1 = '=' * 70 + "\n" | |
separator2 = '-' * 70 + "\n" | |
def defaultSeed(self): | |
"""Return a default random number seed. GLib expects this to be | |
the string 'R02S' followed by four zero-padded 32-bit integers. | |
We need to return a properly formatted seed so that the value | |
can be passed to gtester even when Python and C tests are mixed | |
on the command line.""" | |
try: | |
s = os.urandom(16) | |
a, b, c, d = struct.unpack(">IIII", s) | |
except NotImplementedError: | |
t = time.time() | |
a = int(t) | |
b = int((t - a) * 1000000) | |
c = os.getpid() | |
d = os.name = 'posix' and os.getppid() or 0 | |
return "R02S{0:>08x}{1:>08x}{2:>08x}{3:>08x}".format(a,b,c,d) | |
def __init__(self, log, stream, progName=None, module='__main__', verbosity=1, | |
seed=None): | |
super(GTestResult, self).__init__() | |
self.stream = stream | |
self.log = log | |
self.showAll = verbosity > 1 | |
self.quiet = verbosity == 0 | |
self.module = module | |
self.seed = seed or self.defaultSeed() | |
self.progName = progName or os.path.basename(sys.argv[0]) | |
# These methods implement the glib test protocol. | |
G_TEST_LOG_NONE = 0 | |
G_TEST_LOG_ERROR = 1 # s:msg | |
G_TEST_LOG_START_BINARY = 2 # s:binaryname s:seed | |
G_TEST_LOG_LIST_CASE = 3 # s:testpath | |
G_TEST_LOG_SKIP_CASE = 4 # s:testpath, TODO | |
G_TEST_LOG_START_CASE = 5 # s:testpath | |
G_TEST_LOG_STOP_CASE = 6 # d:status d:nforks d:elapsed | |
G_TEST_LOG_MIN_RESULT = 7 # s:blurb d:result | |
G_TEST_LOG_MAX_RESULT = 8 # s:blurb d:result | |
G_TEST_LOG_MESSAGE = 9 # s:blurb | |
def pack_log(self, typ, strings=(), nums=()): | |
out = struct.pack(">iiii", typ, len(strings), len(nums), 0) | |
for s in strings: | |
out = out + struct.pack(">i", len(s)) + s | |
for n in nums: | |
out = out + struct.pack(">d", float(n)) | |
out = struct.pack(">i", len(out) + 4) + out | |
self.log.write(out) | |
def logStartBinary(self): | |
self.pack_log(self.G_TEST_LOG_START_BINARY, (self.progName, self.seed)) | |
def logStartCase(self, test): | |
id = test_id(test, self.module) | |
self.pack_log(self.G_TEST_LOG_START_CASE, (id,)) | |
def logListCase(self, test): | |
id = test_id(test, self.module) | |
self.pack_log(self.G_TEST_LOG_LIST_CASE, (id,)) | |
def logError(self, msg): | |
self.pack_log(self.G_TEST_LOG_ERROR, (msg,)) | |
def logMessage(self, msg): | |
self.pack_log(self.G_TEST_LOG_MESSAGE, (msg,)) | |
def logStopCase(self, status): | |
elapsed = time.clock() - self.startCaseTime | |
self.pack_log(self.G_TEST_LOG_STOP_CASE, nums=(status,0,elapsed)) | |
def logException(self, test, kind, err, skip=1): | |
"""Return a representation for the exception passed to addError | |
or addFailure.""" | |
type, inst, trace=err | |
file, line, func, text = traceback.extract_tb(trace, skip+1)[skip] | |
msg = "%s:%s:%s:%s: %s\n" % (kind, file, line, type.__name__, inst) | |
self.write("**\n" + msg) | |
self.logError(msg + self.separator1 + self._exc_info_to_string(err, test)) | |
# These methods implement text output. | |
def write(self, str): | |
self.stream.write(str) | |
def flush(self): | |
self.stream.flush() | |
# These methods implement the standard TestResult protocol. | |
def startTestRun(self): | |
self.logStartBinary() | |
self.startTime = time.clock() | |
def stopTestRun(self): | |
self.stopTime = time.clock() | |
def startTest(self, test): | |
super(GTestResult, self).startTest(test) | |
random.seed(self.seed) | |
self.startCaseTime = time.clock() | |
self.logStartCase(test) | |
if not self.quiet: | |
self.write(test_id(test, self.module)) | |
self.write(": ") | |
self.flush() | |
def addSuccess(self, test): | |
super(GTestResult, self).addSuccess(test) | |
self.logStopCase(0) | |
if not self.quiet: | |
self.write("OK\n") | |
def addError(self, test, err): | |
self.logException(test, "ERROR", err) | |
self.logStopCase(1) | |
super(GTestResult, self).addError(test, err) | |
def addFailure(self, test, err): | |
self.logException(test, "FAIL", err) | |
self.logStopCase(1) | |
super(GTestResult, self).addFailure(test, err) | |
def addSkip(self, test, reason): | |
self.logStopCase(0) | |
super(GTestResult, self).addSkip(test, reason) | |
if not self.quiet: | |
self.write("SKIP {0!r}\n".format(reason)) | |
def addExpectedFailure(self, test, err): | |
self.logException("XFAIL", err) | |
self.logStopCase(0) | |
super(GTestResult, self).addExpectedFailure(test, err) | |
if not self.quiet: | |
self.write("XFAIL\n") | |
def addUnexpectedSuccess(self, test): | |
self.logError("unexpected success") | |
self.logStopCase(1) | |
super(GTestResult, self).addUnexpectedSuccess(test) | |
if not self.quiet: | |
self.write("XPASS\n") | |
# Additional methods used by GTestLister and GTestRunner. | |
def listTest(self, test): | |
super(GTestResult, self).startTest(test) | |
self.logListCase(test) | |
if not self.quiet: | |
self.write(test_id(test, self.module)) | |
self.write("\n") | |
self.flush() | |
super(GTestResult, self).addSuccess(test) | |
def printErrors(self): | |
self.printErrorList('ERROR', self.errors) | |
self.printErrorList('FAIL', self.failures) | |
self.write(self.separator2) | |
def printErrorList(self, kind, errors): | |
for test, err in errors: | |
self.write(self.separator1) | |
self.write("%s: %s\n" % (kind,test_id(test, self.module))) | |
self.write(self.separator2) | |
self.write("%s\n" % err) | |
def printSummary(self): | |
run = self.testsRun | |
timeTaken = self.stopTime - self.startTime | |
if not self.quiet: | |
self.write("Ran %d test%s in %.3fs\n\n" % | |
(run, run != 1 and "s" or "", timeTaken)) | |
infos = [] | |
failed = len(self.failures) | |
if failed: | |
infos.append("failures=%d" % failed) | |
errored = len(self.errors) | |
if errored: | |
infos.append("errors=%d" % errored) | |
skipped = len(self.skipped) | |
if skipped: | |
infos.append("skipped=%d" % skipped) | |
expectedFails = len(self.expectedFailures) | |
if expectedFails: | |
infos.append("expected failures=%d" % expectedFails) | |
unexpectedSuccesses = len(self.unexpectedSuccesses) | |
if unexpectedSuccesses: | |
infos.append("unexpected successes=%d" % unexpectedSuccesses) | |
if not self.wasSuccessful(): | |
self.write("FAILED (%s)\n" % (", ".join(infos),)) | |
elif infos: | |
self.write("OK (%s)\n" % (", ".join(infos),)) | |
elif not self.quiet: | |
self.write("OK\n") | |
def printResults(self): | |
if self.quiet or self.showAll: | |
if self.showAll: | |
self.write("\n") | |
self.printErrors() | |
self.printSummary() | |
class GAbstractTestRunner(object): | |
"""A test runner class that interfaces to a GTestResult. Compared | |
to e.g. TextTestRunner, it can pass down some data that is required | |
by the glib test protocol (program name, random number seed).""" | |
def __init__(self, log=None, stream=sys.stdout, verbosity=1, progName=None, | |
module='__main__', failfast=True, seed=None, buffer=False): | |
self.module = module | |
self.verbosity = verbosity | |
self.failfast = failfast | |
self.buffer = buffer | |
self.progName = progName | |
self.seed = seed | |
class _DummyStream(object): | |
def write(self, s): | |
pass | |
self.log = log or _DummyStream() | |
self.stream = log and _DummyStream() or stream | |
def _makeResult(self): | |
return GTestResult(stream=self.stream, log=self.log, module=self.module, | |
seed=self.seed, progName=self.progName, | |
verbosity=self.verbosity) | |
def run(self, test): | |
"Run the given test case or test suite." | |
result = self._makeResult() | |
registerResult(result) | |
result.failfast = self.failfast | |
result.buffer = self.buffer | |
startTestRun = getattr(result, 'startTestRun', None) | |
if startTestRun is not None: | |
startTestRun() | |
try: | |
self.doRun(test, result) | |
finally: | |
stopTestRun = getattr(result, 'stopTestRun', None) | |
if stopTestRun is not None: | |
stopTestRun() | |
return result | |
def doRun(self, test, result): | |
"Run the given test case or test suite." | |
test(result) | |
class GTestLister(GAbstractTestRunner): | |
"""A test runner class that only prints the names of test cases. | |
in the suite. | |
When run in standalone mode it prints out the names of tests | |
that have been selected. When run in gtester mode, it logs | |
the same information using the glib test protocol.""" | |
def doRun(self, test, result): | |
"""Run the given test case or test suite (actually just ask the | |
GTestResult to list the test case).""" | |
for t in test: | |
result.listTest(t) | |
class GTestRunner(GAbstractTestRunner): | |
"""A test runner class that emits results in textual and GLib form. | |
When run in standalone mode it prints out the names of tests as | |
they are run, errors as they occur, and a summary of the results | |
at the end of the test run, depending on the verbosity level. | |
When run in gtester mode, it logs the entire run using the glib | |
test protocol.""" | |
def run(self, test): | |
"Run the given test case or test suite." | |
result = super(GTestRunner, self).run(test) | |
result.printResults() | |
return result | |
USAGE = """\ | |
Usage: %(progName)s [options] | |
Help Options: | |
-?, --help Show help options | |
Test Options: | |
-l List test cases available in a test executable | |
-seed=RANDOMSEED Provide a random seed to reproduce test | |
runs using random numbers | |
-v, --verbose Run tests verbosely | |
-q, --quiet Run tests quietly | |
-p TESTPATH execute all tests matching TESTPATH | |
-s TESTPATH skip all tests matching TESTPATH | |
-m {perf|slow|thorough|quick} (unsupported) Execute tests according to mode | |
-k, --keep-going Keep running after the first failure | |
--runner=CLASS Use an alternative test runner | |
(e.g. unittest.TextTestRunner) | |
--GTestLogFD=N file descriptor for communication with GTester | |
--GTestSkipCount=N (unsupported) gtester-specific argument | |
-p and -s accept slash-separated paths. They work even in combination with | |
--runner. -l and --GTestLogFD are only supported without --runner. | |
""" | |
class GTestProgram(object): | |
"""A command-line program that runs a set of tests, used to make test | |
modules conveniently executable and to interface them with gtester.""" | |
failfast = True | |
progName = None | |
seed = None | |
exit = True | |
logFD = None | |
verbosity = 1 | |
path = '*' | |
skipPath = None | |
def getModule(self, module): | |
if isinstance(module, basestring): | |
m = __import__(module) | |
for part in module.split('.')[1:]: | |
m = getattr(m, part) | |
return m | |
else: | |
return module | |
def getClass(self, name): | |
if isinstance(name, basestring): | |
parts = name.split('.') | |
module = ".".join(parts[:-1]) | |
m = self.getModule(module) | |
return getattr(m, parts[-1]) | |
else: | |
return name | |
def __init__(self, module='__main__', argv=None, | |
testRunner=None, testLoader=unittest.defaultTestLoader, | |
exit=True, seed=None): | |
argv = argv or sys.argv | |
self.module = self.getModule(module) | |
self.exit = exit | |
self.seed = seed | |
self.testRunner = testRunner | |
self.testLoader = testLoader | |
self.progName = os.path.basename(argv[0]) | |
self.parseArgs(argv) | |
self.suite = self.createTests() | |
self.runTests() | |
def usageExit(self, msg=None): | |
if msg: | |
print msg | |
print USAGE % {'progName': self.progName} | |
if self.exit: | |
sys.exit(2) | |
def parseArgs(self, argv): | |
import getopt | |
long_opts = ['seed=', 'keep-going', 'verbose', 'quiet', 'GTestLogFD=', | |
'runner=', 'GTestSkipCount='] | |
list = False | |
try: | |
options, args = getopt.getopt(argv[1:], '?hHlvqs:p:m:k', long_opts) | |
for opt, value in options: | |
# quirk in the gtest option parser... | |
if opt[1] != '-' and value != '' and value[0] == '=': | |
value = value[1:] | |
if opt in ('-h','-H','-?','--help'): | |
self.usageExit() | |
if opt in ('-q','--quiet'): | |
self.verbosity = 0 | |
if opt in ('-v','--verbose'): | |
self.verbosity = 2 | |
if opt in ('-k','--keep-going'): | |
self.failfast = False | |
if opt in ('-l'): | |
list = True | |
if opt in ('-p'): | |
self.path = value | |
if opt in ('-s'): | |
self.skipPath = value | |
if opt in ('--seed'): | |
self.seed = value | |
if opt in ('--runner'): | |
self.testRunner = self.getClass(value) | |
if opt in ('--GTestLogFD'): | |
self.logFD = int(value) | |
except getopt.error, msg: | |
self.usageExit(msg) | |
if self.testRunner is None: | |
self.testRunner = list and GTestLister or GTestRunner | |
else: | |
# Set the seed now for user-specified runners. GTestRunners | |
# resets it for each testcase, so that results are reproducible | |
# even if other tests change. | |
random.seed(self.seed) | |
def createTests(self): | |
allTests = self.testLoader.loadTestsFromModule(self.module) | |
return SelectTestsSuite(tests=allTests, path=self.path, skipPath=self.skipPath, | |
module=self.module) | |
def runTests(self): | |
installHandler() | |
if isinstance(self.testRunner, type) and \ | |
issubclass(self.testRunner, GAbstractTestRunner): | |
# pass GTest-specific options to GAbstractTestRunner subclasses | |
testRunner = self.testRunner(verbosity=self.verbosity, | |
failfast=self.failfast, | |
module=self.module, | |
progName=self.progName, | |
seed=self.seed, | |
log=self.logFD and os.fdopen(self.logFD, 'w')) | |
elif isinstance(self.testRunner, (type, types.ClassType)): | |
try: | |
testRunner = self.testRunner(verbosity=self.verbosity, | |
failfast=self.failfast) | |
except TypeError: | |
testRunner = self.testRunner() | |
else: | |
# it is assumed to be a TestRunner instance | |
testRunner = self.testRunner | |
self.result = testRunner.run(self.suite) | |
if self.exit: | |
sys.exit(not self.result.wasSuccessful()) | |
main = GTestProgram |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment