Created
April 30, 2012 22:09
-
-
Save FSX/2563088 to your computer and use it in GitHub Desktop.
Small unit testing module.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From https://github.com/FSX/misaka/blob/master/tests/misaka_test.py | |
import re | |
import codecs | |
from os import path | |
from glob import glob | |
from subprocess import Popen, PIPE, STDOUT | |
from minitest import TestCase, ok, runner | |
import misaka | |
from misaka import Markdown, HtmlRenderer, \ | |
HTML_SMARTYPANTS | |
def clean_html(dirty_html): | |
input_html = dirty_html.encode('utf-8') | |
p = Popen(['tidy', '--show-body-only', '1', '--quiet', '1', '--show-warnings', '0', '-utf8'], | |
stdout=PIPE, stdin=PIPE, stderr=STDOUT) | |
stdout, stderr = p.communicate(input=input_html) | |
return stdout.decode('utf-8') | |
class SmartyPantsTest(TestCase): | |
name = 'SmartyPants' | |
def setup(self): | |
self.r = lambda html: misaka.html(html, render_flags=HTML_SMARTYPANTS) | |
def test_single_quotes_re(self): | |
html = self.r('<p>They\'re not for sale.</p>\n') | |
ok(html).diff('<p>They’re not for sale.</p>\n') | |
def test_single_quotes_ll(self): | |
html = self.r('<p>Well that\'ll be the day</p>\n') | |
ok(html).diff('<p>Well that’ll be the day</p>\n') | |
def test_double_quotes_to_curly_quotes(self): | |
html = self.r('<p>"Quoted text"</p>\n') | |
ok(html).diff('<p>“Quoted text”</p>\n') | |
def test_single_quotes_ve(self): | |
html = self.r('<p>I\'ve been meaning to tell you ..</p>\n') | |
ok(html).diff('<p>I’ve been meaning to tell you ..</p>\n') | |
def test_single_quotes_m(self): | |
html = self.r('<p>I\'m not kidding</p>\n') | |
ok(html).diff('<p>I’m not kidding</p>\n') | |
def test_single_quotes_d(self): | |
html = self.r('<p>what\'d you say?</p>\n') | |
ok(html).diff('<p>what’d you say?</p>\n') | |
# Adding tests on setup | |
class MarkdownConformanceTest_10(TestCase): | |
name = 'Markdown Conformance 1.0' | |
suite = 'MarkdownTest_1.0' | |
def setup(self): | |
self.r = Markdown(HtmlRenderer()).render | |
tests_dir = path.dirname(__file__) | |
for text_path in glob(path.join(tests_dir, self.suite, '*.text')): | |
html_path = '%s.html' % path.splitext(text_path)[0] | |
self._create_test(text_path, html_path) | |
def _create_test(self, text_path, html_path): | |
def test(): | |
with codecs.open(text_path, 'r', encoding='utf-8') as fd: | |
text = fd.read() | |
with codecs.open(html_path, 'r', encoding='utf-8') as fd: | |
expected_html = fd.read() | |
actual_html = self.r(text) | |
expected_result = clean_html(expected_html) | |
actual_result = clean_html(actual_html) | |
ok(actual_result).diff(expected_result) | |
test.__name__ = self._test_name(text_path) | |
self.add_test(test) | |
def _test_name(self, text_path): | |
name = path.splitext(path.basename(text_path))[0] | |
name = name.replace(' - ', '_') | |
name = name.replace(' ', '_') | |
name = re.sub('[(),]', '', name) | |
return 'test_%s' % name.lower() | |
if __name__ == '__main__': | |
runner([ | |
SmartyPantsTest, | |
MarkdownConformanceTest_10 | |
]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
minitest | |
~~~~~~~~ | |
minitest is a really simple unit testing module. Inspired by | |
Oktest, http://www.kuwata-lab.com/oktest/ | |
The code in ``TornadoTestCase`` is taken from ``AsyncTestCase`` in | |
https://github.com/facebook/tornado/blob/master/tornado/testing.py | |
Changelog: | |
* 0.3: | |
- Handle unexpected exceptions. | |
* 0.2: | |
- Use exit code '1' when there are failed tests. | |
- Added ``pre_setup`` and ``post_teardown`` for ``TestCase`` subclasses. | |
- Added a test case class for Tornado, ``TornadoTestCase``. | |
* 0.1: Initial version. | |
""" | |
VERSION = (0, 2, 0) | |
import sys | |
import time | |
import traceback | |
import contextlib | |
from difflib import unified_diff | |
from collections import namedtuple | |
from tornado.ioloop import IOLoop | |
from tornado.stack_context import StackContext, NullContext | |
Result = namedtuple('Result', ('func', 'name', 'failure')) | |
def msg(message): | |
return '{0}\n{1}\n{0}'.format('-' * 80, message) | |
class TimeOutException(Exception): | |
pass | |
class ExtendedDict(dict): | |
def __getattr__(self, name): | |
if name in self: | |
return self[name] | |
else: | |
return None | |
def __setattr__(self, name, value): | |
self[name] = value | |
class AssertionObject(object): | |
def __init__(self, target): | |
self._target = target | |
def __lt__(self, other): | |
if not self._target < other: | |
raise AssertionError(msg('%r < %r' % (self._target, other))) | |
def __le__(self, other): | |
if not self._target <= other: | |
raise AssertionError(msg('%r <= %r' % (self._target, other))) | |
def __eq__(self, other): | |
if not self._target == other: | |
raise AssertionError(msg('%r == %r' % (self._target, other))) | |
def __ne__(self, other): | |
if not self._target != other: | |
raise AssertionError(msg('%r != % ' % (self._target, other))) | |
def __gt__(self, other): | |
if not self._target > other: | |
raise AssertionError(msg('%r > %r' % (self._target, other))) | |
def __ge__(self, other): | |
if not self._target >= other: | |
raise AssertionError(msg('%r >= %r' % (self._target, other))) | |
def diff(self, other): | |
if self._target != other: | |
difference = unified_diff( | |
other.splitlines(True), | |
self._target.splitlines(True)) | |
raise AssertionError(msg(''.join(difference))) | |
def contains(self, other): | |
if other not in self._target: | |
raise AssertionError(msg('%r in %r' % (other, self._target))) | |
def not_contains(self, other): | |
if other in self._target: | |
raise AssertionError(msg('%r not in %r' % (other, self._target))) | |
def ok(target): | |
return AssertionObject(target) | |
class TestCase(object): | |
def __init__(self, config): | |
self.config = config | |
self._tests = [] | |
for t in dir(self): | |
if t.startswith('test_'): | |
self.add_test(getattr(self, t)) | |
def add_test(self, func): | |
def catch_exception(): | |
try: | |
func() | |
failure = None | |
except AssertionError as e: | |
failure = str(e) | |
except Exception as e: | |
failure = msg(''.join( | |
traceback.format_exception(*sys.exc_info())).strip()) | |
return Result(func.__name__, func.__doc__, failure) | |
self._tests.append(catch_exception) | |
return catch_exception | |
def pre_setup(self): | |
pass | |
def setup(self): | |
pass | |
def teardown(self): | |
pass | |
def post_teardown(self): | |
pass | |
def run(self): | |
self.pre_setup() | |
self.setup() | |
for test in self._tests: | |
yield test() | |
self.teardown() | |
self.post_teardown() | |
class TornadoTestCase(TestCase): | |
def __init__(self, config): | |
super(TornadoTestCase, self).__init__(config) | |
self._stopped = False | |
self._running = False | |
self._failure = None | |
self._stop_args = None | |
def get_new_ioloop(self): | |
"""Creates a new IOLoop for this test. May be overridden in | |
subclasses for tests that require a specific IOLoop (usually | |
the singleton). | |
""" | |
return IOLoop() | |
def pre_setup(self): | |
self.io_loop = self.get_new_ioloop() | |
def post_teardown(self): | |
if (not IOLoop.initialized() or self.io_loop is not IOLoop.instance()): | |
# Try to clean up any file descriptors left open in the ioloop. | |
# This avoids leaks, especially when tests are run repeatedly | |
# in the same process with autoreload (because curl does not | |
# set FD_CLOEXEC on its file descriptors) | |
self.io_loop.close(all_fds=True) | |
@contextlib.contextmanager | |
def _stack_context(self): | |
try: | |
yield | |
except Exception: | |
self._failure = sys.exc_info() | |
self.stop() | |
def run(self): | |
with StackContext(self._stack_context): | |
self.pre_setup() | |
self.setup() | |
for test in self._tests: | |
yield test() | |
self.teardown() | |
self.post_teardown() | |
def stop(self, _arg=None, **kwargs): | |
"""Stops the ioloop, causing one pending (or future) call to wait() | |
to return. | |
Keyword arguments or a single positional argument passed to stop() are | |
saved and will be returned by wait(). | |
""" | |
assert _arg is None or not kwargs | |
self._stop_args = kwargs or _arg | |
if self._running: | |
self.io_loop.stop() | |
self._running = False | |
self._stopped = True | |
def wait(self, condition=None, timeout=5): | |
"""Runs the IOLoop until stop is called or timeout has passed. | |
In the event of a timeout, an exception will be thrown. | |
If condition is not None, the IOLoop will be restarted after stop() | |
until condition() returns true. | |
""" | |
if not self._stopped: | |
if timeout: | |
def timeout_func(): | |
try: | |
raise TimeOutException('Async operation timed out after %d seconds' % timeout) | |
except Exception: | |
self._failure = sys.exc_info() | |
self.stop() | |
self.io_loop.add_timeout(time.time() + timeout, timeout_func) | |
while True: | |
self._running = True | |
with NullContext(): | |
# Wipe out the StackContext that was established in | |
# self.run() so that all callbacks executed inside the | |
# IOLoop will re-run it. | |
self.io_loop.start() | |
if (self._failure is not None or condition is None or condition()): | |
break | |
assert self._stopped | |
self._stopped = False | |
if self._failure is not None: | |
# 2to3 isn't smart enough to convert three-argument raise | |
# statements correctly in some cases. | |
if isinstance(self._failure[1], self._failure[0]): | |
raise self._failure[1], None, self._failure[2] | |
else: | |
raise self._failure[0], self._failure[1], self._failure[2] | |
result = self._stop_args | |
self._stop_args = None | |
return result | |
def runner(testcases, config={}): | |
passed = failed = 0 | |
config = ExtendedDict(config) | |
for testcase in testcases: | |
tests = testcase(config) | |
if hasattr(tests, 'name'): | |
print('\n>> %s' % tests.name) | |
for result in tests.run(): | |
name = result.name or result.func | |
if result.failure is not None: | |
failed += 1 | |
print('%s ... FAILED\n\n%s\n' % (name, result.failure)) | |
else: | |
passed += 1 | |
print('%s ... PASSED' % name) | |
print('\n\n%s passed; %s failed.' % (passed, failed)) | |
if failed > 0: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment