Created
May 21, 2014 21:40
-
-
Save klrmn/06d1cf81b8f3bcb8a77c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import unittest | |
from testresources import OptimisingTestSuite | |
from subunit.run import SubunitTestProgram, SubunitTestRunner, get_default_formatter | |
import io, sys, os, logging | |
import fixtures | |
from testtools import content | |
from testtools.testcase import gather_details | |
class TestLoader(unittest.TestLoader): | |
"""Test loader that extends unittest.TestLoader to: | |
* support names that can be a combination of modules and directories | |
""" | |
def loadTestsFromNames(self, names, module=None): | |
"""Return a suite of all tests cases found using the given sequence | |
of string specifiers. See 'loadTestsFromName()'. | |
""" | |
suites = [] | |
for name in names: | |
if os.path.isdir(name): | |
top_level = os.path.split(name)[0] | |
suites.extend(self.discover(name, top_level_dir=top_level)) | |
else: | |
suites.extend(self.loadTestsFromName(name, module)) | |
return self.suiteClass(suites) | |
def switch(self, previous_test, test, result): | |
"""Keeps track of the current resource set. Attaches errors removing | |
resources to the previous test, and errors adding resources to the | |
current test. | |
:param previous_test: Most recent completed test, or None | |
:param test: Current test, or None. | |
:param result: TestResult object to report activity on. | |
""" | |
if test: | |
resources = getattr(test, 'resources', []) | |
new_resource_set = set() | |
for name, resource in resources: | |
new_resource_set.update(resource.neededResources()) | |
else: | |
new_resource_set = set() | |
# determine which uses cannot be reused or don't exist now | |
new_resources = new_resource_set - self.current_resources | |
old_resources = self.current_resources - new_resource_set | |
FORMAT = '%(asctime)s [%(levelname)s] %(name)s %(lineno)d: %(message)s' | |
fixture = fixtures.FakeLogger(format=FORMAT) | |
fixture.setUp() | |
abort = False | |
# attribute any failures in resource teardown to the previous test | |
# print("Removing %s" % old_resources) | |
for resource in old_resources: | |
try: | |
resource.finishedWith(resource._currentResource, result) | |
self.current_resources.discard(resource) | |
except Exception as e: | |
fixture.addDetail('traceback', content.TracebackContent(sys.exc_info(), previous_test)) | |
from acceptance.resources import DirtyWebDriverResource, WebDriverRM | |
if isinstance(e, DirtyWebDriverResource): | |
WebDriverRM.dirtied(e.resource) # try to get a new instance | |
else: | |
abort = True | |
break | |
# attribute any failures in resource setup to the current test | |
# print("Adding %s" % new_resources) | |
if not abort: | |
for resource in new_resources: | |
try: | |
resource.getResource(result) | |
self.current_resources.add(resource) | |
except Exception as e: | |
fixture.addDetail('traceback', content.TracebackContent(sys.exc_info(), previous_test)) | |
from acceptance.resources import DirtyWebDriverResource, WebDriverRM | |
if isinstance(e, DirtyWebDriverResource): | |
WebDriverRM.dirtied(e.resource) # try to get a new instance | |
else: | |
abort = True | |
break | |
# print(self.current_resources) | |
if abort: | |
if test: | |
gather_details(fixture.getDetails(), test.getDetails()) | |
result.addError(test, details=test.getDetails()) | |
else: | |
gather_details(fixture.getDetails(), previous_test.getDetails()) | |
result.addError(previous_test, details=previous_test.getDetails()) | |
fixture.cleanUp() | |
return not abort | |
def run(self, result): | |
self.sortTests() | |
# the next several lines' job is to tell me how | |
# tests are being divided for parallelization | |
node = os.getenv('DEFAULT_NODE', 'node1') | |
import time | |
time.sleep(10 * int(node[-1])) | |
for test in self._tests: | |
if not hasattr(test, 'id'): | |
print('%s suite' % node) | |
else: | |
print('%s %s' % (node, test.id())) | |
return result | |
previous_test = None | |
# print(self._tests) | |
for test in self._tests: | |
if not hasattr(test, 'id'): | |
continue | |
# print('%s' % test) | |
if result.shouldStop: | |
break | |
if self.switch(previous_test, test, result): | |
test(result) | |
previous_test = test | |
self.switch(previous_test, None, result) | |
return result | |
OptimisingTestSuite.switch = switch | |
OptimisingTestSuite.run = run | |
OptimisingTestSuite.current_resources = set() | |
def main(): | |
# Disable the default buffering, for Python 2.x where pdb doesn't do it | |
# on non-ttys. | |
stream = get_default_formatter() | |
runner = SubunitTestRunner | |
# Patch stdout to be unbuffered, so that pdb works well on 2.6/2.7. | |
binstdout = io.open(sys.stdout.fileno(), 'wb', 0) | |
if sys.version_info[0] > 2: | |
sys.stdout = io.TextIOWrapper(binstdout, encoding=sys.stdout.encoding) | |
else: | |
sys.stdout = binstdout | |
loader = TestLoader() | |
loader.suiteClass = OptimisingTestSuite | |
SubunitTestProgram(module=None, argv=sys.argv, testRunner=runner, | |
stdout=sys.stdout, testLoader=loader) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment