Last active
November 18, 2020 09:09
-
-
Save bonzini/2b1a7bb332491ade558985ed0ff1405c to your computer and use it in GitHub Desktop.
Mockup for fancy "meson test" output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import asyncio | |
from collections import namedtuple, OrderedDict | |
import sys | |
import os | |
from typing import NamedTuple | |
import typing as T | |
import time | |
Number = T.TypeVar('Number', int, float) | |
f = None | |
class OutputStrings(NamedTuple): | |
ok: str | |
fail: str | |
skip: str | |
error: str | |
scissors: str | |
dash: str | |
def mblue(s): | |
return '\x1b[34;1m%s\x1b[0m' % s | |
def myellow(s): | |
return '\x1b[33;1m%s\x1b[0m' % s | |
def mgreen(s): | |
return '\x1b[32;1m%s\x1b[0m' % s | |
def mred(s): | |
return '\x1b[31;1m%s\x1b[0m' % s | |
def mbold(s): | |
return '\x1b[1m%s\x1b[0m' % s | |
class TestCaseFormatter(object): | |
number: int | |
name: str | |
cmdline: str | |
starting_time: Number | |
timeout: Number | |
num_subtests: T.Optional[int] = None | |
have_subtests: bool = False | |
ran: int = 0 | |
failed: int = 0 | |
last_ran: int = 0 | |
last_failed: int = 0 | |
subtest_repr: T.List[str] = [] | |
subtest_width: int | |
def __init__(self, number, name, cmdline, timeout, subtest_width): | |
self.number = number | |
self.name = name | |
self.cmdline = cmdline | |
self.timeout = timeout | |
self.subtest_width = subtest_width - 2 # leave space for brackets | |
self.starting_time = time.time() | |
def set_subtests(self, num=None): | |
self.have_subtests = True | |
self.num_subtests = num | |
if num is not None: | |
self.subtest_width = min(self.num_subtests, self.subtest_width) | |
if self.num_subtests == self.subtest_width or self.subtest_width >= 10: | |
self.subtest_repr = list(' ' * self.subtest_width) | |
def subtest_num_to_char(self, num): | |
"""Return the character number in subtest_repr corresponding to the | |
num-th subtest, and the first subtest for that character.""" | |
if self.num_subtests == self.subtest_width: | |
return num - 1, num | |
else: | |
numerator = (num - 1) * self.subtest_width | |
denominator = self.num_subtests | |
char = numerator // denominator | |
char_offset = (numerator % denominator) // self.subtest_width | |
return char, num - char_offset | |
def update_subtest_repr(self, num, status): | |
if not self.subtest_repr: | |
return | |
char, char_start = self.subtest_num_to_char(num) | |
if status == 'ok' and self.last_failed < char_start: | |
# Only add a checkmark if no test has failed for this character | |
self.subtest_repr[char] = mgreen('\u2714') | |
elif status == 'fail': | |
self.subtest_repr[char] = mred('\u2718') | |
elif status == 'skip' and self.last_ran < char_start: | |
# Only add a dot if all tests have been skipped for this character | |
self.subtest_repr[char] = mblue('.') | |
def update_subtest(self, num, status, name): | |
if status != 'skip': | |
self.ran += 1 | |
self.last_ran = num | |
if status == 'fail': | |
self.failed += 1 | |
self.last_failed = num | |
if status == 'skip': | |
f.emit_subtest_result(self, num, mblue('SKIP'), name) | |
elif status == 'ok': | |
f.emit_subtest_result(self, num, mgreen('OK'), name) | |
else: | |
f.emit_subtest_result(self, num, mred('FAIL'), name) | |
self.update_subtest_repr(num, status) | |
f.request_test_update(self) | |
def text_subtest_repr(self): | |
"""Textual representation of testcases, used for late plan or | |
after the run has finished.""" | |
if not self.ran or not self.have_subtests: | |
return '' | |
elif self.failed == 0: | |
return '%d tests passed' % self.ran | |
else: | |
return '%d/%d tests passed' % (self.ran - self.failed, self.ran) | |
def running_subtest_repr(self): | |
if self.have_subtests: | |
if self.subtest_repr: | |
return '[' + ''.join(self.subtest_repr) + ']' | |
elif self.subtest_width > 20: | |
return self.text_subtest_repr() | |
return '' | |
class TestOutputFormatter(object): | |
# Do not use single-width characters that become double-width when | |
# changed to emoji presentation, because terminals make a mess out | |
# of them. | |
EmojiOutputs = OutputStrings(' \u2705\ufe0f ', ' \u274c\ufe0f ', | |
' \u2753\ufe0f ', ' \u2757\ufe0f ', ' \u2700 ', '\u2015') | |
TextOutputs = OutputStrings(': ', ': ', ': ', ': ', ' 8< ', '-') | |
def __init__(self, term_width, total_tests, verbose=False, emoji=True): | |
self.max_digits = len(str(total_tests)) | |
self.num_jobs = 4 | |
self.verbose = verbose | |
self.term_width = term_width | |
self.total_tests = total_tests | |
self.outputs = self.EmojiOutputs if emoji else self.TextOutputs | |
self.output_end = self.outputs.dash * (self.term_width - 1) | |
if self.term_width > 20: | |
self.output_start = \ | |
self.output_end[0:self.term_width // 2 - 2] + self.outputs.scissors + \ | |
self.output_end[self.term_width // 2 + 2:] | |
else: | |
self.output_start = "Output:" | |
# "[53-57/99]" -> max_digits * 3 + 4 | |
# "53/99 xx " -> max_digits * 2 + 1 + extra | |
self.left_width = max(self.max_digits * 3 + 4, self.max_digits * 2 + 1 + len(self.outputs.ok)) | |
# two separators to the side of the name | |
# 14 characters for the clock or status, plus one space | |
# two characters at the right hand side | |
term_width -= self.left_width + 1 + 1 + 14 + 1 + 2 | |
self.name_width = max(int(term_width * 2 / 3), 50) | |
self.subtest_width = term_width - self.name_width | |
self.num_tests = 0 | |
self.progress_test = None | |
self.running_tests = OrderedDict() | |
self.stop = False | |
self.should_erase_line = '' | |
# Low-level tabular printing | |
def print(self, line, progress=False): | |
if progress: | |
print(self.should_erase_line, line, sep='', end='\r') | |
self.should_erase_line = '\x1b[K' | |
else: | |
print(self.should_erase_line, line, sep='') | |
self.should_erase_line = '' | |
def print2(self, left, middle, progress=False): | |
line = '%-*s %s' % (self.left_width, left, middle) | |
self.print(line, progress) | |
def print3(self, left, middle, right, progress=False): | |
if not progress and len(middle) > self.name_width: | |
self.print2(left, middle) | |
left = middle = '' | |
line = '%-*s %-*s %s' % (self.left_width, left, self.name_width, middle, right) | |
self.print(line, progress) | |
def print5(self, left, middle, status, clock, detail): | |
right = status + ' ' + clock | |
detail = '(' + detail + ')' | |
if len(clock) + len(detail) + 1 <= self.subtest_width: | |
self.print3(left, middle, right + ' ' + detail) | |
elif len(middle) > self.name_width: | |
self.print2(left, middle) | |
self.print3('', detail, right) | |
else: | |
self.print3(left, middle, right) | |
self.print2('', detail) | |
# High-level printing | |
def left(self, test): | |
return '%*d/%*d' % ( | |
self.max_digits, test.number, | |
self.max_digits, self.total_tests) | |
def emit_progress(self, test): | |
left = '[%d-%d/%d]' % ( | |
self.num_tests - len(self.running_tests) + 1, | |
self.num_tests, | |
self.total_tests) | |
seconds = time.time() - test.starting_time | |
clock = '%d/%d s' % (int(seconds), int(test.timeout)) | |
right = '%-14s %s' % (clock, test.running_subtest_repr()) | |
self.print3(left, test.name, right, progress=True) | |
def emit_start(self, test): | |
if self.verbose: | |
left = self.left(test) | |
self.print2(left, 'Starting ' + test.name) | |
self.print2('', mbold(test.cmdline)) | |
def emit_reproducer(self, line): | |
if not self.verbose or self.num_jobs > 1: | |
self.print2('', '%s %s' % (mbold('Reproducer:'), line)) | |
def emit_output_start(self): | |
self.print(mbold(self.output_start)) | |
def emit_output_end(self): | |
self.print(mbold(self.output_end)) | |
def emit_subtest_result(self, test, num, status, subtest_name): | |
if not self.verbose and 'FAIL' not in status: | |
return | |
left = self.left(test) | |
main = '%s - %d %s' % (test.name, num, subtest_name) | |
self.print3(left, main, status) | |
def emit_result(self, test, result, status, detail): | |
seconds = '%.2f s' % (time.time() - test.starting_time) | |
left = self.left(test) + getattr(self.outputs, result) | |
self.print5(left, test.name, status, seconds, detail) | |
if result == 'fail': | |
self.emit_reproducer(test.cmdline) | |
del self.running_tests[test.name] | |
if self.progress_test is test: | |
self.progress_test = None | |
# Asynchronous progress updates | |
def finish(self): | |
self.stop = True | |
self.request_update() | |
def request_test_update(self, for_test=None): | |
if self.progress_test is for_test: | |
self.request_update() | |
def request_update(self): | |
if self.update: | |
self.update.set() | |
async def report_progress(self): | |
loop = asyncio.get_event_loop() | |
self.update = asyncio.Event() | |
next_update = 0 | |
self.request_update() | |
while not self.stop: | |
await self.update.wait() | |
self.update.clear() | |
if loop.time() >= next_update: | |
self.progress_test = None | |
next_update = loop.time() + 1 | |
loop.call_at(next_update, self.update.set) | |
if not self.progress_test: | |
if not self.running_tests: | |
continue | |
key, self.progress_test = self.running_tests.popitem(last=False) | |
self.running_tests[key] = self.progress_test | |
self.emit_progress(self.progress_test) | |
def start_test(self, name, cmdline, timeout): | |
self.num_tests += 1 | |
test = TestCaseFormatter( | |
number=self.num_tests, name=name, cmdline=cmdline, | |
timeout=timeout, subtest_width=self.subtest_width) | |
if self.verbose: | |
self.emit_start(test) | |
self.running_tests[name] = test | |
self.running_tests.move_to_end(name, last=False) | |
self.request_update() | |
return test | |
async def t1(): | |
t1 = f.start_test('suite1: test1', './test1 --tap -k', 4) | |
t1.set_subtests(4) | |
t1.update_subtest(1, 'ok', 'first') | |
t1.update_subtest(2, 'skip', 'second') | |
t1.update_subtest(3, 'ok', 'third') | |
await asyncio.sleep(2.9) | |
t1.update_subtest(4, 'ok', 'fourth') | |
f.emit_result(t1, 'ok', mgreen('OK '), t1.text_subtest_repr()) | |
return t1 | |
async def t2(): | |
t2 = f.start_test('suite1: test2', './test2 --tap -k', 30) | |
t2.set_subtests(40) | |
for i in range(1, 41): | |
if i < 35: | |
if i == 1 or i == 11 or i == 35: | |
await asyncio.sleep(1) | |
else: | |
await asyncio.sleep(0.1) | |
if i == 20: | |
t2.update_subtest(i, 'fail', 'here are failing subtests, some with a long name') | |
elif i == 30: | |
t2.update_subtest(i, 'fail', 'which are printed even in non-verbose mode') | |
else: | |
t2.update_subtest(i, 'ok', f'test {i}') | |
f.emit_result(t2, 'fail', mred('FAIL '), t2.text_subtest_repr()) | |
async def t3(): | |
t3 = f.start_test('suite1: non-TAP test1', './test3.sh', 10) | |
await asyncio.sleep(3) | |
f.emit_result(t3, 'fail', mred('FAIL '), 'exit code 1') | |
f.emit_output_start() | |
print('error blah blah') | |
f.emit_output_end() | |
async def t4(): | |
t4 = f.start_test('suite1: non-TAP test2', './test4.sh', 10) | |
await asyncio.sleep(3) | |
f.emit_result(t4, 'ok', mgreen('EXPECTEDFAIL '), 'exit code 2') | |
async def tests(): | |
f1 = asyncio.create_task(t1()) | |
await asyncio.sleep(1) | |
f2 = asyncio.create_task(t2()) | |
await asyncio.sleep(5) | |
f3 = asyncio.create_task(t3()) | |
await asyncio.sleep(1) | |
f4 = asyncio.create_task(t4()) | |
await f1 | |
await f2 | |
await f3 | |
await f4 | |
async def run(): | |
formatter = asyncio.create_task(f.report_progress()) | |
await tests() | |
f.finish() | |
await formatter | |
verbose = '-v' in sys.argv | |
try: | |
columns, rows = os.get_terminal_size(1) | |
f = TestOutputFormatter(columns, 4, verbose=verbose, emoji=True) | |
except OSError: | |
f = TestOutputFormatter(80, 4, verbose=verbose, emoji=False) | |
asyncio.run(run()) | |
print() | |
print() | |
print('Ok: 1 ') | |
print('Expected Fail: 1 ') | |
print('Fail: 2 ') | |
print('Unexpected Pass: 0 ') | |
print('Skipped: 0 ') | |
print('Timeout: 0 ') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment