Created
September 12, 2014 20:18
-
-
Save wickman/5f3fb184f25f6931445c to your computer and use it in GitHub Desktop.
custom pants pytest runner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/python/twitter/pants/BUILD b/src/python/twitter/pants/BUILD | |
index c4cfaac..656392d 100644 | |
--- a/src/python/twitter/pants/BUILD | |
+++ b/src/python/twitter/pants/BUILD | |
@@ -27,13 +27,13 @@ python_library( | |
name='pants-deps', | |
dependencies=[ | |
pants('3rdparty/python:ansicolors'), | |
+ pants('3rdparty/python:coverage'), | |
pants('3rdparty/python:elementtree'), | |
pants('3rdparty/python:mako'), | |
pants('3rdparty/python:markdown'), | |
pants('3rdparty/python:psutil'), | |
pants('3rdparty/python:pygments'), | |
pants('3rdparty/python:pytest'), | |
- pants('3rdparty/python:pytest-cov'), | |
pants('3rdparty/python:requests'), | |
python_requirement('pylint==0.25.1', version_filter=pylint_build_filter), | |
] | |
@@ -44,6 +44,7 @@ python_library( | |
dependencies=[ | |
pants(':pants-deps'), | |
pants('src/python/twitter/common/collections'), | |
+ pants('src/python/twitter/common/concurrent'), | |
pants('src/python/twitter/common/config'), | |
pants('src/python/twitter/common/confluence'), | |
pants('src/python/twitter/common/contextutil'), | |
@@ -53,6 +54,7 @@ python_library( | |
pants('src/python/twitter/common/log'), | |
pants('src/python/twitter/common/process'), | |
pants('src/python/twitter/common/python'), | |
+ pants('src/python/twitter/common/recordio'), | |
pants('src/python/twitter/common/quantity'), | |
pants('src/python/twitter/common/util'), | |
pants('src/python/twitter/thrift/text'), | |
@@ -77,7 +79,7 @@ python_library( | |
# TODO(wickman) This version should be automatically pulled in from twitter.pants.version | |
provides=setup_py( | |
name='twitter.pants', | |
- version='0.0.12', | |
+ version='0.0.14', | |
description="Twitter's pants build tool.", | |
url='https://github.com/twitter/commons', | |
license='Apache License, Version 2.0', | |
diff --git a/src/python/twitter/pants/base/address.py b/src/python/twitter/pants/base/address.py | |
index 8d4a192..25178a4 100644 | |
--- a/src/python/twitter/pants/base/address.py | |
+++ b/src/python/twitter/pants/base/address.py | |
@@ -93,5 +93,15 @@ class Address(object): | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
+ def compact_repr(self, source_root=None): | |
+ def _repr(): | |
+ if os.path.basename(self.buildfile.parent_path) == self.target_name: | |
+ return os.path.dirname(self.buildfile.relpath) | |
+ else: | |
+ return '%s:%s' % (os.path.dirname(self.buildfile.relpath), self.target_name) | |
+ if source_root: | |
+ return os.path.relpath(_repr(), source_root) | |
+ return _repr() | |
+ | |
def __repr__(self): | |
return "%s:%s" % (self.buildfile, self.target_name) | |
diff --git a/src/python/twitter/pants/commands/build.py b/src/python/twitter/pants/commands/build.py | |
index b32363c..b678c82 100644 | |
--- a/src/python/twitter/pants/commands/build.py | |
+++ b/src/python/twitter/pants/commands/build.py | |
@@ -95,6 +95,7 @@ class Build(Command): | |
if not target: | |
self.error("Target %s does not exist" % address) | |
+ | |
self.targets.update(tgt for tgt in target.resolve() if is_concrete(tgt)) | |
def debug(self, message): | |
diff --git a/src/python/twitter/pants/python/pytest_runner.py b/src/python/twitter/pants/python/pytest_runner.py | |
new file mode 100644 | |
index 0000000..62a56a4 | |
--- /dev/null | |
+++ b/src/python/twitter/pants/python/pytest_runner.py | |
@@ -0,0 +1,169 @@ | |
+""" | |
+Pants Python pytest driver. For more information about the pytest plugin interface, see: | |
+http://pytest.org/latest/plugins.html#well-specified-hooks | |
+ | |
+This plugin provides a recordio interface to test results as well as a code coverage driver. | |
+""" | |
+ | |
+import json | |
+import os | |
+import struct | |
+ | |
+from pytest import Function | |
+ | |
+ | |
+_RECORD_STREAM = None | |
+_COV = None | |
+ | |
+ | |
+class PythonTestRecord(object): | |
+ def __init__(self, json_dict): | |
+ pass | |
+ | |
+ | |
+class PythonTestHeader(PythonTestRecord): | |
+ def __init__(self, json_dict): | |
+ self.filename, self.functions = map(json_dict.get, ('filename', 'tests')) | |
+ super(PythonTestHeader, self).__init__(json_dict) | |
+ | |
+ | |
+class PythonTestResult(PythonTestRecord): | |
+ def __init__(self, json_dict): | |
+ self.function, self.filename, self.lineno, self.outcome = map( | |
+ json_dict.get, ('function', 'filename', 'lineno', 'outcome')) | |
+ super(PythonTestResult, self).__init__(json_dict) | |
+ | |
+ | |
+class PythonTestFooter(PythonTestRecord): | |
+ pass | |
+ | |
+ | |
+def write_record(fp, json_dict): | |
+ """Write one Pants pytest driver record (json_dict) to the stream (fp).""" | |
+ json_blob = json.dumps(json_dict).encode('utf-8') | |
+ header = struct.pack('>L', len(json_blob)) | |
+ fp.write(header + json_blob) | |
+ fp.flush() | |
+ | |
+ | |
+def read_record(fp): | |
+ """Read one record Pants pytest driver record from the stream fp. | |
+ | |
+ The pytest_runner and test_builder communicate via a simple recordio-based mechanism, | |
+ framed in the following fashion: | |
+ | |
+ [4 byte int "len1"][string blob of length "len1"] | |
+ [4 byte int "len2"][string blob of length "len2"] | |
+ ... | |
+ | |
+ The return value will be an object of type PythonTestRecord or None if nothing was processed. | |
+ There are three distinct subtypes: | |
+ - PythonTestHeader* | |
+ - PythonTestResult* | |
+ - PythonTestFooter | |
+ | |
+ PythonTestResult has the following attributes: | |
+ .function | |
+ .filename | |
+ .lineno | |
+ .outcome (one of 'passed', 'failed', 'skipped') | |
+ | |
+ Messages types with '*' may be delivered multiple times. Only one footer will be delivered | |
+ and signifies that a test run has completed. | |
+ """ | |
+ header = fp.read(4) | |
+ if len(header) == 0: | |
+ fp.seek(fp.tell()) | |
+ return | |
+ blob_len = struct.unpack('>L', header)[0] | |
+ read_blob = fp.read(blob_len) | |
+ if len(read_blob) != blob_len: | |
+ raise ValueError('Failed to read stream!') | |
+ read_blob = json.loads(read_blob) | |
+ if read_blob['type'] == 'header': | |
+ return PythonTestHeader(read_blob) | |
+ elif read_blob['type'] == 'footer': | |
+ return PythonTestFooter(read_blob) | |
+ elif read_blob['type'] == 'test': | |
+ return PythonTestResult(read_blob) | |
+ raise ValueError('Unknown record type encountered: %s' % read_blob.get('type')) | |
+ | |
+ | |
+def pytest_addoption(parser): | |
+ group = parser.getgroup("terminal reporting") | |
+ group.addoption('--pants_test_recordio', | |
+ action="store", | |
+ dest="pants_test_recordio", | |
+ metavar="PATH", | |
+ default=None, | |
+ help="Create a pants test recordio at this path.") | |
+ group.addoption('--pants_test_coverage_root', | |
+ action="store", | |
+ dest="pants_test_coverage_root", | |
+ metavar="PATH", | |
+ default=None, | |
+ help="If specified, enable coverage at this root directory.") | |
+ | |
+ | |
+def pytest_configure(config): | |
+ global _RECORD_STREAM | |
+ global _COV | |
+ if config.option.pants_test_recordio: | |
+ _RECORD_STREAM = open(config.option.pants_test_recordio, 'ab+') | |
+ if config.option.pants_test_coverage_root: | |
+ root = config.option.pants_test_coverage_root | |
+ try: | |
+ from coverage import coverage | |
+ except ImportError: | |
+ print('Failed to import coverage. Coverage disabled.') | |
+ return | |
+ _COV = coverage( | |
+ config_file=os.path.join(root, 'config'), | |
+ auto_data=True, | |
+ ) | |
+ _COV.start() | |
+ | |
+ | |
+def translate_function_names(report): | |
+ for result in report.result: | |
+ if not isinstance(result, Function): | |
+ continue | |
+ nodeid = report.nodeid.split('::') | |
+ if len(nodeid) == 1: | |
+ yield result.name | |
+ else: | |
+ yield '%s.%s' % (nodeid[1], result.name) | |
+ | |
+ | |
+def pytest_collectreport(report): | |
+ write_record(_RECORD_STREAM, dict( | |
+ type='header', | |
+ filename=report.nodeid.split('::')[0], | |
+ tests=list(translate_function_names(report)) | |
+ )) | |
+ | |
+ | |
+def pytest_runtest_logreport(report): | |
+ if _RECORD_STREAM is None or report.when != 'call': | |
+ return | |
+ _, lineno, function = report.location | |
+ filename = report.fspath | |
+ outcome = report.outcome | |
+ write_record(_RECORD_STREAM, dict( | |
+ type='test', | |
+ function=function, | |
+ filename=filename, | |
+ lineno=lineno, | |
+ outcome=outcome, | |
+ )) | |
+ | |
+ | |
+def pytest_unconfigure(config): | |
+ if _RECORD_STREAM is None: | |
+ return | |
+ if _COV: | |
+ _COV.stop() | |
+ write_record(_RECORD_STREAM, dict( | |
+ type='footer' | |
+ )) | |
+ _RECORD_STREAM.close() | |
diff --git a/src/python/twitter/pants/python/python_chroot.py b/src/python/twitter/pants/python/python_chroot.py | |
index d566ec0..4588a11 100644 | |
--- a/src/python/twitter/pants/python/python_chroot.py | |
+++ b/src/python/twitter/pants/python/python_chroot.py | |
@@ -61,16 +61,23 @@ class PythonChroot(object): | |
def __init__(self, target): | |
Exception.__init__(self, "Not a valid Python dependency! Found: %s" % target) | |
- def __init__(self, target, root_dir, extra_targets=None, builder=None, interpreter=None, | |
- conn_timeout=None): | |
+ def __init__(self, target, | |
+ root_dir, | |
+ extra_targets=None, | |
+ builder=None, | |
+ interpreter=None, | |
+ platforms=None, | |
+ conn_timeout=None): | |
self._config = Config.load() | |
self._target = target | |
self._root = root_dir | |
self._interpreter = interpreter or PythonInterpreter.get() | |
- self._cache = BuildCache(os.path.join(self._config.get('python-setup', 'artifact_cache'), | |
- '%s' % self._interpreter.identity)) | |
+ self._cache = BuildCache( | |
+ os.path.join(self._config.get('python-setup', 'artifact_cache'), | |
+ '%s' % self._interpreter.identity)) | |
self._extra_targets = list(extra_targets) if extra_targets is not None else [] | |
- self._resolver = MultiResolver(self._config, target, conn_timeout=conn_timeout) | |
+ self._resolver = MultiResolver( | |
+ self._config, target, platforms=platforms, conn_timeout=conn_timeout) | |
self._builder = builder or PEXBuilder(tempfile.mkdtemp(), interpreter=self._interpreter) | |
def __del__(self): | |
@@ -165,15 +172,15 @@ class PythonChroot(object): | |
for antlr in targets['antlrs']: | |
generated_reqs.add(self._generate_antlr_requirement(antlr)) | |
+ reqs_to_build = OrderedSet() | |
targets['reqs'] |= generated_reqs | |
for req in targets['reqs']: | |
if not req.should_build(self._interpreter.python, Platform.current()): | |
self.debug('Skipping %s based upon version filter' % req) | |
continue | |
+ reqs_to_build.add(req) | |
self._dump_requirement(req._requirement, False, req._repository) | |
- reqs_to_build = (req for req in targets['reqs'] | |
- if req.should_build(self._interpreter.python, Platform.current())) | |
for dist in self._resolver.resolve(reqs_to_build, interpreter=self._interpreter): | |
self._dump_distribution(dist) | |
diff --git a/src/python/twitter/pants/python/resolver.py b/src/python/twitter/pants/python/resolver.py | |
index 0a9b604..dd56a31 100644 | |
--- a/src/python/twitter/pants/python/resolver.py | |
+++ b/src/python/twitter/pants/python/resolver.py | |
@@ -40,8 +40,9 @@ class MultiResolver(ResolverBase): | |
return Crawler(cache=config.get('python-setup', 'download_cache'), | |
conn_timeout=conn_timeout) | |
- def __init__(self, config, target, conn_timeout=None): | |
- platforms = config.getlist('python-setup', 'platforms', ['current']) | |
+ def __init__(self, config, target, platforms=None, conn_timeout=None): | |
+ if platforms is None: | |
+ platforms = config.getlist('python-setup', 'platforms', ['current']) | |
if isinstance(target, PythonBinary) and target.platforms: | |
platforms = target.platforms | |
self._install_cache = config.get('python-setup', 'install_cache') | |
diff --git a/src/python/twitter/pants/python/sdist_builder.py b/src/python/twitter/pants/python/sdist_builder.py | |
index f35f693..4472557 100644 | |
--- a/src/python/twitter/pants/python/sdist_builder.py | |
+++ b/src/python/twitter/pants/python/sdist_builder.py | |
@@ -21,7 +21,6 @@ import os | |
import subprocess | |
import sys | |
-from twitter.common.contextutil import pushd | |
from twitter.common.python.installer import Packager | |
diff --git a/src/python/twitter/pants/python/test_builder.py b/src/python/twitter/pants/python/test_builder.py | |
index 6f7a019..aba8840 100644 | |
--- a/src/python/twitter/pants/python/test_builder.py | |
+++ b/src/python/twitter/pants/python/test_builder.py | |
@@ -16,126 +16,247 @@ | |
from __future__ import print_function | |
-__author__ = 'Brian Wickman' | |
- | |
try: | |
import configparser | |
except ImportError: | |
import ConfigParser as configparser | |
+from contextlib import contextmanager | |
import errno | |
+import json | |
import os | |
-import time | |
+import pkgutil | |
+import shutil | |
import signal | |
+import subprocess | |
import sys | |
+from textwrap import dedent | |
+import threading | |
+import time | |
from twitter.common.collections import OrderedSet | |
-from twitter.common.contextutil import temporary_file | |
-from twitter.common.dirutil import safe_mkdir | |
+from twitter.common.contextutil import temporary_dir | |
+from twitter.common.dirutil import safe_mkdir, safe_open | |
from twitter.common.lang import Compatibility | |
-from twitter.common.quantity import Amount, Time | |
from twitter.common.python.interpreter import PythonInterpreter | |
from twitter.common.python.pex import PEX | |
from twitter.common.python.pex_builder import PEXBuilder | |
- | |
+from twitter.common.quantity import Amount, Time | |
from twitter.pants.base import Config, ParseContext | |
-from twitter.pants.python.python_chroot import PythonChroot | |
from twitter.pants.targets import ( | |
+ PythonLibrary, | |
PythonRequirement, | |
- PythonTarget, | |
PythonTestSuite, | |
- PythonTests) | |
+ PythonTests, | |
+) | |
+try: | |
+ from Queue import Queue, Empty | |
+except ImportError: | |
+ from queue import Queue, Empty | |
-class PythonTestResult(object): | |
- @staticmethod | |
- def timeout(): | |
- return PythonTestResult('TIMEOUT') | |
+from .python_chroot import PythonChroot | |
+from .pytest_runner import ( | |
+ read_record as read_pytest_record, | |
+ PythonTestFooter, | |
+ PythonTestResult, | |
+) | |
- @staticmethod | |
- def exception(): | |
- return PythonTestResult('EXCEPTION') | |
+from pkg_resources import DistributionNotFound | |
- @staticmethod | |
- def rc(value): | |
- return PythonTestResult('SUCCESS' if value == 0 else 'FAILURE', | |
- rc=value) | |
- def __init__(self, msg, rc=None): | |
- self._rc = rc | |
- self._msg = msg | |
+class Display(object): | |
+ try: | |
+ import colors | |
+ except ImportError: | |
+ colors = None | |
- def __str__(self): | |
- return self._msg | |
+ DEFAULT_WIDTH = 100 | |
+ | |
+ @classmethod | |
+ def render(cls, string, color): | |
+ if cls.colors is None: | |
+ return string | |
+ color_fn = getattr(cls.colors, color, None) | |
+ if not callable(color_fn): | |
+ return string | |
+ return color_fn(string) | |
+ | |
+ @classmethod | |
+ def isatty(cls, filelike): | |
+ if not hasattr(filelike, 'fileno'): | |
+ return False | |
+ return os.isatty(filelike.fileno()) and filelike.fileno() in ( | |
+ sys.stdout.fileno(), sys.stderr.fileno()) | |
+ | |
+ @classmethod | |
+ def length(cls, string): | |
+ """calculate printable length of the string""" | |
+ if cls.colors is None: | |
+ return len(str(string)) | |
+ return len(cls.colors.strip_color(str(string))) | |
+ | |
+ def __init__(self, filelike, width=DEFAULT_WIDTH): | |
+ self._filelike = filelike | |
+ self._isatty = self.isatty(filelike) | |
+ self._colorful = self._isatty | |
+ self._width = width | |
+ self._width_duration = 0.5 | |
+ self._width_expiration = time.time() | |
@property | |
- def success(self): | |
- return self._rc == 0 | |
+ def width(self): | |
+ """Return a guess for the terminal width.""" | |
+ if not self._isatty: | |
+ return self._width | |
+ now = time.time() | |
+ if now > self._width_expiration: | |
+ self._width_expiration = now + self._width_duration | |
+ try: | |
+ self._width = int(os.popen('stty size', 'r').read().split()[1]) | |
+ except (TypeError, ValueError): | |
+ pass | |
+ return self._width | |
+ | |
+ def center(self, string, filler='='): | |
+ filler_width = self.width - self.length(string) | |
+ if not self._colorful and self.colors: | |
+ string = self.colors.strip_color(string) | |
+ print('%s%s%s' % (filler * (filler_width // 2), string, | |
+ (filler * (filler_width // 2 + filler_width % 2))), file=self._filelike) | |
+ | |
+ def write(self, string): | |
+ if not self._colorful and self.colors: | |
+ string = self.colors.strip_color(string) | |
+ print(string, file=self._filelike) | |
+ | |
+ def left_right(self, left_string, right_string): | |
+ width = self.width | |
+ if not self._colorful and self.colors: | |
+ left_string = self.colors.strip_color(left_string) | |
+ right_string = self.colors.strip_color(right_string) | |
+ left_length = self.length(left_string) | |
+ right_length = self.length(right_string) | |
+ leftover = max(width - left_length - right_length, 1) | |
+ dots_length = 15 - right_length | |
+ dots_string = leftover * ' ' + dots_length * '.' + ' ' | |
+ print('%s%s%s' % (left_string, dots_string[-leftover:], right_string), | |
+ file=self._filelike) | |
+ | |
+ | |
+class PythonTestOutcome(object): | |
+ SUCCESS = 0 | |
+ FAILURE = 1 | |
+ SKIPPED = 2 | |
+ TIMEOUT = 3 | |
+ CANCELLED = 4 | |
+ EXCEPTION = 5 | |
+ | |
+ NAME_MAP = { | |
+ SUCCESS: 'SUCCESS', | |
+ FAILURE: 'FAILURE', | |
+ SKIPPED: 'SKIPPED', | |
+ TIMEOUT: 'TIMEOUT', | |
+ CANCELLED: 'CANCELLED', | |
+ EXCEPTION: 'EXCEPTION', | |
+ } | |
+ | |
+ COLOR_MAP = { | |
+ SUCCESS: 'green', | |
+ FAILURE: 'red', | |
+ SKIPPED: 'yellow', | |
+ TIMEOUT: 'cyan', | |
+ CANCELLED: 'yellow', | |
+ EXCEPTION: 'red', | |
+ } | |
+ @classmethod | |
+ def timeout(cls): | |
+ return cls(cls.TIMEOUT) | |
-DEFAULT_COVERAGE_CONFIG = """ | |
-[run] | |
-branch = True | |
-timid = True | |
+ @classmethod | |
+ def exception(cls): | |
+ return cls(cls.EXCEPTION) | |
-[report] | |
-exclude_lines = | |
- def __repr__ | |
- raise NotImplementedError | |
+ @classmethod | |
+ def cancelled(cls): | |
+ return cls(cls.CANCELLED) | |
-ignore_errors = True | |
-""" | |
+ @classmethod | |
+ def success(cls): | |
+ return cls(cls.SUCCESS) | |
-def generate_coverage_config(target): | |
- cp = configparser.ConfigParser() | |
- cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG)) | |
- cp.add_section('html') | |
- target_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage', | |
- os.path.dirname(target.address.buildfile.relpath), target.name) | |
- safe_mkdir(target_dir) | |
- cp.set('html', 'directory', target_dir) | |
- return cp | |
+ @classmethod | |
+ def failure(cls): | |
+ return cls(cls.FAILURE) | |
+ @classmethod | |
+ def skipped(cls): | |
+ return cls(cls.SKIPPED) | |
-class PythonTestBuilder(object): | |
- class InvalidDependencyException(Exception): pass | |
- class ChrootBuildingException(Exception): pass | |
+ def __init__(self, type_): | |
+ if type_ not in self.NAME_MAP: | |
+ raise ValueError('Unknown type %s' % type_) | |
+ self._type = type_ | |
- TESTING_TARGETS = None | |
+ def __str__(self): | |
+ return Display.render(self.NAME_MAP[self._type], self.COLOR_MAP[self._type]) | |
- # TODO(wickman) Expose these as configuratable parameters | |
- TEST_TIMEOUT = Amount(2, Time.MINUTES) | |
- TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS) | |
+ @property | |
+ def succeeded(self): | |
+ return self._type in (self.SUCCESS, self.SKIPPED) | |
- def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None): | |
- self.targets = targets | |
- self.args = args | |
- self.root_dir = root_dir | |
- self.interpreter = interpreter or PythonInterpreter.get() | |
- self.successes = {} | |
- self._conn_timeout = conn_timeout | |
+ @property | |
+ def failed(self): | |
+ return self._type in (self.FAILURE, self.EXCEPTION, self.TIMEOUT) | |
+ | |
+ | |
+DEFAULT_COVERAGE_CONFIG = dedent(""" | |
+ [run] | |
+ branch = True | |
+ timid = True | |
+ | |
+ [report] | |
+ exclude_lines = | |
+ def __repr__ | |
+ raise NotImplementedError | |
+ | |
+ ignore_errors = True | |
+""") | |
+ | |
+ | |
+@contextmanager | |
+def coverage_config(path): | |
+ with safe_open(path, 'wb') as fp: | |
+ cp = configparser.ConfigParser() | |
+ cp.readfp(Compatibility.StringIO(DEFAULT_COVERAGE_CONFIG)) | |
+ yield cp | |
+ cp.write(fp) | |
- def run(self): | |
- self.successes = {} | |
- rv = self._run_tests(self.targets) | |
- for target in sorted(self.successes): | |
- print('%-80s.....%10s' % (target, self.successes[target])) | |
- return 0 if rv.success else 1 | |
+ | |
+class PythonTestChroot(object): | |
+ """ | |
+ A wrapper around PythonChroot to encapsulate test-specific dependencies (e.g. pytest, | |
+ coverage) and drivers (e.g. pytest_runner.) | |
+ """ | |
+ class Error(Exception): pass | |
+ class ChrootBuildingException(Error): pass | |
+ | |
+ TESTING_TARGETS = {} | |
@classmethod | |
- def generate_test_targets(cls): | |
- if cls.TESTING_TARGETS is None: | |
+ def generate_test_targets(cls, interpreter): | |
+ if interpreter.version_string not in cls.TESTING_TARGETS: | |
with ParseContext.temp(): | |
- cls.TESTING_TARGETS = [ | |
+ cls.TESTING_TARGETS[interpreter.version_string] = [ | |
PythonRequirement('pytest'), | |
- PythonRequirement('pytest-cov'), | |
- PythonRequirement('coverage==3.6b1'), | |
+ PythonRequirement('coverage'), | |
PythonRequirement('unittest2', version_filter=lambda py, pl: py.startswith('2')), | |
PythonRequirement('unittest2py3k', version_filter=lambda py, pl: py.startswith('3')) | |
] | |
- return cls.TESTING_TARGETS | |
+ return cls.TESTING_TARGETS[interpreter.version_string] | |
- @staticmethod | |
- def generate_junit_args(target): | |
+ @classmethod | |
+ def generate_junit_args(cls, target): | |
args = [] | |
xml_base = os.getenv('JUNIT_XML_BASE') | |
if xml_base: | |
@@ -146,94 +267,345 @@ class PythonTestBuilder(object): | |
os.makedirs(os.path.dirname(xml_path)) | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
- raise PythonTestBuilder.ChrootBuildingException( | |
+ raise cls.ChrootBuildingException( | |
"Unable to establish JUnit target: %s! %s" % (target, e)) | |
args.append('--junitxml=%s' % xml_path) | |
return args | |
- @staticmethod | |
- def cov_setup(target, chroot): | |
- cp = generate_coverage_config(target) | |
- with temporary_file(cleanup=False) as fp: | |
- cp.write(fp) | |
- filename = fp.name | |
- if target.coverage: | |
- source = target.coverage | |
+ COVERAGE_ROOT = '.coverage' | |
+ | |
+ @classmethod | |
+ def coverage_root(cls, builder): | |
+ return os.path.join(builder.path(), cls.COVERAGE_ROOT) | |
+ | |
+ @classmethod | |
+ def write_coverage_setup(cls, target, builder): | |
+ coverage_root = cls.coverage_root(builder) | |
+ with coverage_config(os.path.join(coverage_root, 'config')) as cp: | |
+ if target.coverage: | |
+ modules = target.coverage | |
+ else: | |
+ # This technically makes the assumption that tests/python/<target> will be testing | |
+ # src/python/<target>. To change to honest measurements, do target.walk() here insead, | |
+ # however this results in very useless and noisy coverage reports. | |
+ modules = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources) | |
+ | |
+ def as_list(iterable): | |
+ return '\n' + '\n'.join(iterable) | |
+ | |
+ cp.set('run', 'source', as_list(modules)) | |
+ cp.set('run', 'data_file', os.path.join(coverage_root, 'data')) | |
+ cp.set('run', 'parallel', True) | |
+ return coverage_root | |
+ | |
+ @classmethod | |
+ def write_coverage_sitecustomize(cls, builder): | |
+ coverage_root = cls.coverage_root(builder) | |
+ with safe_open(os.path.join(coverage_root, 'sitecustomize.py'), 'wb') as fp: | |
+ fp.write(dedent(''' | |
+ import os | |
+ try: | |
+ from coverage import coverage | |
+ cov = coverage(config_file=os.path.join(%r, 'config'), auto_data=True) | |
+ cov.start() | |
+ except ImportError: | |
+ pass | |
+ ''' % coverage_root)) | |
+ return coverage_root | |
+ | |
+ @classmethod | |
+ def write_pytest_runner(cls, builder): | |
+ builder.chroot().write(pkgutil.get_data(__name__, 'pytest_runner.py'), 'pytest_runner.py') | |
+ | |
+ def __init__(self, | |
+ target, | |
+ root_dir, | |
+ interpreter=None, | |
+ coverage=False, | |
+ conn_timeout=None, | |
+ redirect=False, | |
+ timeout=None): | |
+ """ | |
+ :param target: The PythonTests target to chroot | |
+ :param root_dir: The build root of the repository. | |
+ :keyword interpreter: If None, use the current interpreter, otherwise suppled | |
+ PythonInterpreter. | |
+ :keyword coverage: If True, enable coverage reporting for this test. | |
+ :keyword conn_timeout: Use this connection timeout for all downstream resolvers. | |
+ :keyword redirect: If True, redirect stdout to a file. Otherwise map to sys.stdout | |
+ :keyword timeout: The Amount of Time this test has to complete before being cancelled. | |
+ """ | |
+ if not isinstance(target, PythonTests): | |
+ raise TypeError('PythonTestChroot takes a PythonTests object.') | |
+ self.target = target | |
+ self.root_dir = root_dir | |
+ self._builder = self._chroot = None | |
+ self.coverage = coverage | |
+ self.interpreter = interpreter or PythonInterpreter.get() | |
+ self._conn_timeout = conn_timeout | |
+ self._built = False | |
+ self._recordio = None | |
+ self.results = set() | |
+ self.acknowledged_results = set() | |
+ self.state = None | |
+ self._processed_footer = False | |
+ self._redirect = redirect | |
+ self._expiration = None | |
+ self._timeout = timeout or Amount(1000, Time.HOURS) | |
+ # Pants is not threadsafe -- calling self.target.sources actually | |
+ # changes the cwd if it's never been called before. I spent almost 5 | |
+ # hours tracking down nondeterministic bugs caused by this. Moving from | |
+ # .popen to .__init__ fixes it, since the builds are serialized, but | |
+ # this seems brittle. | |
+ self._sources = [os.path.abspath(os.path.join(self.target.target_base, source)) | |
+ for source in self.target.sources] | |
+ | |
+ def build(self): | |
+ builder = PEXBuilder() | |
+ builder.info.entry_point = 'pytest' | |
+ builder.info.ignore_errors = self.target._soft_dependencies | |
+ self._chroot = PythonChroot( | |
+ self.target, | |
+ self.root_dir, | |
+ interpreter=self.interpreter, | |
+ platforms=('current',), # only build tests for the current platform | |
+ extra_targets=self.generate_test_targets(PythonInterpreter.get()), | |
+ builder=builder, | |
+ conn_timeout=self._conn_timeout) | |
+ self._builder = self._chroot.dump() | |
+ self.write_pytest_runner(self._builder) | |
+ self._builder.freeze() | |
+ self._recordio_path = os.path.join(self._builder.path(), 'tests.recordio') | |
+ self._recordio = open(self._recordio_path, 'wb+') | |
+ self._built = True | |
+ self._stdout = None | |
+ | |
+ def pytest_runner_args(self): | |
+ return ['-p', 'pytest_runner', '--pants_test_recordio', self._recordio_path] | |
+ | |
+ def popen(self, args): | |
+ if not self._built: | |
+ self.build() | |
+ test_args = self.generate_junit_args(self.target) | |
+ test_args.extend(self.pytest_runner_args()) | |
+ if self._redirect: | |
+ # If we are redirecting for the purpose of the parallel test runner, make output as quiet | |
+ # as possible but capture stdout/stderr of the processes so that they can be printed. | |
+ test_args.append('-qs') | |
+ test_args.extend(args) | |
+ env = os.environ.copy() | |
+ if self.coverage: | |
+ test_args.append( | |
+ '--pants_test_coverage=%s' % self.write_coverage_setup(self.target, self._builder)) | |
+ coverage_root = self.write_coverage_sitecustomize(self._builder) | |
+ pythonpath = [coverage_root] + list(filter(None, env.get('PYTHONPATH', '').split(':'))) | |
+ env['PYTHONPATH'] = ':'.join(pythonpath) | |
+ if self._redirect: | |
+ self._stdout = open(os.path.join(self._builder.path(), 'stdout'), 'w') | |
+ pex = PEX(self._builder.path(), interpreter=self.interpreter) | |
+ self._po = pex.run( | |
+ args=test_args + self._sources, | |
+ blocking=False, | |
+ setsid=True, | |
+ stdout=self._stdout, | |
+ env=env, | |
+ stderr=subprocess.STDOUT) | |
+ self._expiration = time.time() + self._timeout.as_(Time.SECONDS) | |
+ return self._po | |
+ | |
+ @property | |
+ def output(self): | |
+ if not self._redirect: | |
+ return '' | |
+ with open(os.path.join(self._builder.path(), 'stdout')) as fp: | |
+ return fp.read() | |
+ | |
+ @property | |
+ def finished(self): | |
+ return self.results == self.acknowledged_results and self._processed_footer | |
+ | |
+ def _process_record(self, record): | |
+ if isinstance(record, PythonTestResult): | |
+ self.results.add(record) | |
+ return record | |
+ elif isinstance(record, PythonTestFooter): | |
+ self._processed_footer = True | |
+ if self._stdout: | |
+ self._stdout.close() | |
+ self._po.wait() | |
+ | |
+ def read(self): | |
+ if not self._processed_footer and time.time() > self._expiration: | |
+ self.state = 'timeout' | |
+ self.kill() | |
+ return | |
+ record = read_pytest_record(self._recordio) | |
+ if record is not None: | |
+ return self._process_record(record) | |
+ | |
+ def acknowledge(self, result): | |
+ self.acknowledged_results.add(result) | |
+ | |
+ def mark_skipped(self): | |
+ self.kill(state='skipped') | |
+ | |
+ @property | |
+ def outcome(self): | |
+ if not self.finished and self.state is None: | |
+ return None | |
+ if self.state == 'timeout': | |
+ return PythonTestOutcome.timeout() | |
+ elif self.state == 'skipped': | |
+ return PythonTestOutcome.skipped() | |
+ elif self.state == 'cancelled': | |
+ return PythonTestOutcome.cancelled() | |
else: | |
- # This technically makes the assumption that tests/python/<target> will be testing | |
- # src/python/<target>. To change to honest measurements, do target.walk() here instead, | |
- # however this results in very useless and noisy coverage reports. | |
- source = set(os.path.dirname(source).replace(os.sep, '.') for source in target.sources) | |
- args = ['-p', 'pytest_cov', | |
- '--cov-config', filename, | |
- '--cov-report', 'html', | |
- '--cov-report', 'term'] | |
- for module in source: | |
- args.extend(['--cov', module]) | |
- return filename, args | |
- | |
- @staticmethod | |
- def wait_on(popen, timeout=TEST_TIMEOUT): | |
- total_wait = Amount(0, Time.SECONDS) | |
- while total_wait < timeout: | |
- rc = popen.poll() | |
- if rc is not None: | |
- return PythonTestResult.rc(rc) | |
- total_wait += PythonTestBuilder.TEST_POLL_PERIOD | |
- time.sleep(PythonTestBuilder.TEST_POLL_PERIOD.as_(Time.SECONDS)) | |
- popen.kill() | |
- return PythonTestResult.timeout() | |
- | |
- def _run_python_test(self, target): | |
- po = None | |
- rv = PythonTestResult.exception() | |
- coverage_rc = None | |
- coverage_enabled = 'PANTS_PY_COVERAGE' in os.environ | |
+ success = all(result.outcome in ('skipped', 'passed') for result in self.results) | |
+ return PythonTestOutcome.success() if success else PythonTestOutcome.failure() | |
+ | |
+ def iter_coverage_data(self): | |
+ """Yield source_root, data file tuples from this chroot.""" | |
+ if self.coverage: | |
+ coverage_root = os.path.join(self._builder.path(), self.COVERAGE_ROOT) | |
+ for path in os.listdir(coverage_root): | |
+ if path.startswith('data'): | |
+ yield (self._builder.path(), os.path.join(coverage_root, path)) | |
+ | |
+ def _kill(self): | |
+ if self._po: | |
+ self._po.poll() | |
+ if self._po and self._po.returncode != 0: | |
+ try: | |
+ os.killpg(self._po.pid, signal.SIGTERM) | |
+ except OSError as e: | |
+ print('Failed to kill process group %s: %s' % (self._po.pid, e)) | |
- try: | |
- builder = PEXBuilder(interpreter=self.interpreter) | |
- builder.info.entry_point = target.entry_point | |
- builder.info.ignore_errors = target._soft_dependencies | |
- chroot = PythonChroot( | |
- target, | |
- self.root_dir, | |
- extra_targets=self.generate_test_targets(), | |
- builder=builder, | |
- interpreter=self.interpreter, | |
- conn_timeout=self._conn_timeout) | |
- builder = chroot.dump() | |
- builder.freeze() | |
- test_args = PythonTestBuilder.generate_junit_args(target) | |
- test_args.extend(self.args) | |
- if coverage_enabled: | |
- coverage_rc, args = self.cov_setup(target, builder.chroot()) | |
- test_args.extend(args) | |
- sources = [os.path.join(target.target_base, source) for source in target.sources] | |
- po = PEX(builder.path(), interpreter=self.interpreter).run( | |
- args=test_args + sources, blocking=False, setsid=True) | |
- # TODO(wickman) If coverage is enabled, write an intermediate .html that points to | |
- # each of the coverage reports generated and webbrowser.open to that page. | |
- rv = PythonTestBuilder.wait_on(po, timeout=target.timeout) | |
- except Exception as e: | |
- import traceback | |
- print('Failed to run test!', file=sys.stderr) | |
- traceback.print_exc() | |
- rv = PythonTestResult.exception() | |
- finally: | |
- if coverage_rc: | |
- os.unlink(coverage_rc) | |
- if po and po.returncode != 0: | |
- try: | |
- os.killpg(po.pid, signal.SIGTERM) | |
- except OSError as e: | |
- if e.errno == errno.EPERM: | |
- print("Unable to kill process group: %d" % po.pid) | |
- elif e.errno != errno.ESRCH: | |
- rv = PythonTestResult.exception() | |
- self.successes[target._create_id()] = rv | |
- return rv | |
- | |
- def _run_python_test_suite(self, target, fail_hard=True): | |
+ def kill(self, state='cancelled'): | |
+ if not self.finished: | |
+ self.state = state | |
+ self._kill() | |
+ | |
+ | |
+class PythonTestBuilder(object): | |
+ class Error(Exception): pass | |
+ class InvalidDependencyException(Error): pass | |
+ | |
+ # TODO(wickman) Expose these as configurable parameters | |
+ TEST_TIMEOUT = Amount(2, Time.MINUTES) | |
+ TEST_POLL_PERIOD = Amount(100, Time.MILLISECONDS) | |
+ | |
+ def __init__(self, targets, args, root_dir, interpreter=None, conn_timeout=None): | |
+ self.targets = targets | |
+ self.args = args | |
+ self.root_dir = root_dir | |
+ self.successes = {} | |
+ self._display = Display(sys.stdout) | |
+ self.interpreter = interpreter or PythonInterpreter.get() | |
+ self.parallelism = max(1, int(os.environ.get('PANTS_PY_PARALLELISM', 1))) | |
+ self.cancelled = threading.Event() | |
+ self._producers = 0 | |
+ self._producer_lock = threading.Lock() | |
+ self.done = threading.Event() | |
+ self._conn_timeout = conn_timeout | |
+ self._queue = Queue() | |
+ self.coverage = 'PANTS_PY_COVERAGE' in os.environ | |
+ self.coverage_dir = os.path.join(Config.load().getdefault('pants_distdir'), 'coverage') | |
+ if self.coverage: | |
+ safe_mkdir(self.coverage_dir, clean=True) | |
+ self._equivalent_prefixes = OrderedSet([]) | |
+ self._data_files = [] | |
+ | |
+ def build_chroot(self, target): | |
+ chroot = PythonTestChroot( | |
+ target, | |
+ self.root_dir, | |
+ interpreter=self.interpreter, | |
+ coverage='PANTS_PY_COVERAGE' in os.environ, | |
+ redirect=self.parallelism > 1, | |
+ conn_timeout=self._conn_timeout) | |
+ chroot.build() | |
+ return chroot | |
+ | |
+ def drain_queue(self, chroot): | |
+ while True: | |
+ result = chroot.read() | |
+ if result: | |
+ self._queue.put((chroot, result)) | |
+ chroot.acknowledge(result) | |
+ else: | |
+ break | |
+ | |
+ def _collect_source_roots(self, target): | |
+ for dependency in target.dependencies: | |
+ for library in dependency.resolve(): | |
+ if isinstance(library, PythonLibrary): | |
+ yield library.target_base | |
+ | |
+ def _collect_coverage(self, chroot): | |
+ for prefix, data_file in chroot.iter_coverage_data(): | |
+ self._equivalent_prefixes.add(os.path.realpath(prefix)) | |
+ target_file = os.path.join(self.coverage_dir, os.path.basename(data_file)) | |
+ shutil.copy(data_file, target_file) | |
+ self._data_files.append(target_file) | |
+ | |
+ @classmethod | |
+ def _generate_coverage_config(cls, coverage_root, equivalent_prefixes): | |
+ with coverage_config(os.path.join(coverage_root, '.coveragerc')) as cp: | |
+ def as_list(iterable): | |
+ return '\n' + '\n'.join(iterable) | |
+ cp.set('run', 'data_file', os.path.join(coverage_root, 'data')) | |
+ cp.add_section('paths') | |
+ cp.set('paths', 'source', as_list(equivalent_prefixes)) | |
+ return os.path.join(coverage_root, '.coveragerc') | |
+ | |
+ @classmethod | |
+ @contextmanager | |
+ def _gen_cov(cls, coveragerc): | |
+ from coverage import coverage | |
+ cov = coverage(config_file=coveragerc) | |
+ cov.combine() | |
+ yield cov | |
+ | |
+ def run_report(self, source_root): | |
+ self._display.center(' coverage report source root: %s ' % source_root) | |
+ with temporary_dir() as td: | |
+ for filename in os.listdir(self.coverage_dir): | |
+ if filename.startswith('data'): | |
+ shutil.copy(os.path.join(self.coverage_dir, filename), os.path.join(td, filename)) | |
+ coveragerc = self._generate_coverage_config(td, | |
+ [source_root] + list(self._equivalent_prefixes)) | |
+ with self._gen_cov(coveragerc) as cov: | |
+ cov.report(show_missing=False) | |
+ self._display.center('') | |
+ self._display.write('HTML report: file://%s' % os.path.realpath( | |
+ os.path.join(self.coverage_dir, source_root, 'index.html'))) | |
+ cov.html_report(directory=os.path.join(self.coverage_dir, source_root)) | |
+ self._display.center('') | |
+ print('\n') | |
+ | |
+ def _run_one(self, chroot): | |
+ if self.cancelled.is_set(): | |
+ with self._producer_lock: | |
+ self._producers -= 1 | |
+ chroot.mark_skipped() | |
+ return | |
+ | |
+ chroot.popen(self.args) | |
+ | |
+ while not chroot.finished and not self.cancelled.is_set(): | |
+ self.drain_queue(chroot) | |
+ self.cancelled.wait(timeout=0.2) | |
+ self.drain_queue(chroot) | |
+ | |
+ if self.cancelled.is_set(): | |
+ chroot.kill() | |
+ | |
+ self._collect_coverage(chroot) | |
+ | |
+ with self._producer_lock: | |
+ self._producers -= 1 | |
+ | |
+ def _iter_test_suite(self, target): | |
tests = OrderedSet([]) | |
def _gather_deps(trg): | |
if isinstance(trg, PythonTests): | |
@@ -243,33 +615,112 @@ class PythonTestBuilder(object): | |
for dep in dependency.resolve(): | |
_gather_deps(dep) | |
_gather_deps(target) | |
+ return iter(tests) | |
+ | |
+ def _iter_tests(self, targets): | |
+ for target in targets: | |
+ if isinstance(target, PythonTests): | |
+ yield target | |
+ elif isinstance(target, PythonTestSuite): | |
+ for subtarget in self._iter_test_suite(target): | |
+ yield subtarget | |
+ else: | |
+ raise self.InvalidDependencyException( | |
+ "Invalid dependency in python test target: %s" % target) | |
+ | |
+ def _parallel_test_driver(self, tests): | |
+ from concurrent import futures | |
+ from concurrent.futures.thread import ThreadPoolExecutor | |
+ with ThreadPoolExecutor(self.parallelism) as executor: | |
+ fs = [executor.submit(self._run_one, chroot) for test, chroot in tests.items()] | |
+ futures.wait(fs, return_when=futures.ALL_COMPLETED) | |
+ for future in fs: | |
+ if future.exception(): | |
+ print('Future %s excepted: %s' % (future, future.exception())) | |
+ self.done.set() | |
+ | |
+ def run(self): | |
+ tests = {} | |
+ failed_chroots = set() | |
+ self._display.center(' building chroots ') | |
+ for test in self._iter_tests(self.targets): | |
+ try: | |
+ tests[test] = self.build_chroot(test) | |
+ result = Display.render('finished', 'green') | |
+ except DistributionNotFound: | |
+ failed_chroots.add(test) | |
+ result = Display.render('failed', 'red') | |
+ self._display.left_right(test.address.compact_repr(), result) | |
- failed = False | |
- for test in tests: | |
- rv = self._run_python_test(test) | |
- if not rv.success: | |
- failed = True | |
- if fail_hard: | |
- return rv | |
- return PythonTestResult.rc(1 if failed else 0) | |
+ try: | |
+ self._run_tests(tests) | |
+ except KeyboardInterrupt: | |
+ self.cancelled.set() | |
+ | |
+ self.done.wait() | |
+ | |
+ self._display.center(' result by target ') | |
+ for target, chroot in sorted(tests.items()): | |
+ self._display.left_right(target.address.compact_repr(), chroot.outcome) | |
+ if chroot.outcome.failed: | |
+ output = chroot.output | |
+ if output: | |
+ self._display.center(' error output ') | |
+ self._display.write('\n'.join(chroot.output.splitlines())) | |
+ self._display.center('') | |
+ for target in failed_chroots: | |
+ self._display.left_right(target.address.compact_repr(), PythonTestOutcome.exception()) | |
+ | |
+ if self.coverage: | |
+ source_roots = set() | |
+ for target in tests: | |
+ source_roots.update(self._collect_source_roots(target)) | |
+ for source_root in source_roots: | |
+ self.run_report(source_root) | |
+ | |
+ success = all(chroot.outcome is not None and chroot.outcome.success for chroot in tests.values()) | |
+ success = False if failed_chroots else success | |
+ return 0 if success else 1 | |
+ | |
+ def _run_tests(self, tests): | |
+ if not tests: | |
+ self.done.set() | |
+ return | |
+ | |
+ self._producers = len(tests) | |
- def _run_tests(self, targets): | |
fail_hard = 'PANTS_PYTHON_TEST_FAILSOFT' not in os.environ | |
+ | |
if 'PANTS_PY_COVERAGE' in os.environ: | |
# Coverage often throws errors despite tests succeeding, so make PANTS_PY_COVERAGE | |
# force FAILSOFT. | |
fail_hard = False | |
- failed = False | |
- for target in targets: | |
- if isinstance(target, PythonTests): | |
- rv = self._run_python_test(target) | |
- elif isinstance(target, PythonTestSuite): | |
- rv = self._run_python_test_suite(target, fail_hard) | |
- else: | |
- raise PythonTestBuilder.InvalidDependencyException( | |
- "Invalid dependency in python test target: %s" % target) | |
- if not rv.success: | |
- failed = True | |
- if fail_hard: | |
- return rv | |
- return PythonTestResult.rc(1 if failed else 0) | |
+ | |
+ test_runner_thread = threading.Thread(target=lambda: self._parallel_test_driver(tests)) | |
+ test_runner_thread.start() | |
+ | |
+ if self.parallelism > 1: | |
+ self._display.center(' result by test ') | |
+ | |
+ COLOR_MAP = { | |
+ 'passed': 'green', | |
+ 'failed': 'red', | |
+ 'skipped': 'yellow', | |
+ } | |
+ | |
+ def result_str(result): | |
+ return Display.render(result.outcome, COLOR_MAP.get(result.outcome, 'yellow')) | |
+ | |
+ while self._producers > 0: | |
+ while True: | |
+ try: | |
+ chroot, result = self._queue.get(timeout=0.2) | |
+ self._queue.task_done() | |
+ except Empty: | |
+ break | |
+ # If running in parallel mode, serialize test results ourselves. | |
+ if self.parallelism > 1: | |
+ self._display.left_right( | |
+ '%s:%s: %s' % (result.filename, result.lineno, result.function), result_str(result)) | |
+ if result.outcome != 'passed' and fail_hard: | |
+ self.cancelled.set() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment