Skip to content

Instantly share code, notes, and snippets.

@bonzini
Last active November 18, 2020 09:09
Show Gist options
  • Save bonzini/2b1a7bb332491ade558985ed0ff1405c to your computer and use it in GitHub Desktop.
Save bonzini/2b1a7bb332491ade558985ed0ff1405c to your computer and use it in GitHub Desktop.
Mockup for fancy "meson test" output
#! /usr/bin/env python3
import asyncio
from collections import namedtuple, OrderedDict
import sys
import os
from typing import NamedTuple
import typing as T
import time
Number = T.TypeVar('Number', int, float)
f = None
class OutputStrings(NamedTuple):
ok: str
fail: str
skip: str
error: str
scissors: str
dash: str
def mblue(s):
return '\x1b[34;1m%s\x1b[0m' % s
def myellow(s):
return '\x1b[33;1m%s\x1b[0m' % s
def mgreen(s):
return '\x1b[32;1m%s\x1b[0m' % s
def mred(s):
return '\x1b[31;1m%s\x1b[0m' % s
def mbold(s):
return '\x1b[1m%s\x1b[0m' % s
class TestCaseFormatter(object):
number: int
name: str
cmdline: str
starting_time: Number
timeout: Number
num_subtests: T.Optional[int] = None
have_subtests: bool = False
ran: int = 0
failed: int = 0
last_ran: int = 0
last_failed: int = 0
subtest_repr: T.List[str] = []
subtest_width: int
def __init__(self, number, name, cmdline, timeout, subtest_width):
self.number = number
self.name = name
self.cmdline = cmdline
self.timeout = timeout
self.subtest_width = subtest_width - 2 # leave space for brackets
self.starting_time = time.time()
def set_subtests(self, num=None):
self.have_subtests = True
self.num_subtests = num
if num is not None:
self.subtest_width = min(self.num_subtests, self.subtest_width)
if self.num_subtests == self.subtest_width or self.subtest_width >= 10:
self.subtest_repr = list(' ' * self.subtest_width)
def subtest_num_to_char(self, num):
"""Return the character number in subtest_repr corresponding to the
num-th subtest, and the first subtest for that character."""
if self.num_subtests == self.subtest_width:
return num - 1, num
else:
numerator = (num - 1) * self.subtest_width
denominator = self.num_subtests
char = numerator // denominator
char_offset = (numerator % denominator) // self.subtest_width
return char, num - char_offset
def update_subtest_repr(self, num, status):
if not self.subtest_repr:
return
char, char_start = self.subtest_num_to_char(num)
if status == 'ok' and self.last_failed < char_start:
# Only add a checkmark if no test has failed for this character
self.subtest_repr[char] = mgreen('\u2714')
elif status == 'fail':
self.subtest_repr[char] = mred('\u2718')
elif status == 'skip' and self.last_ran < char_start:
# Only add a dot if all tests have been skipped for this character
self.subtest_repr[char] = mblue('.')
def update_subtest(self, num, status, name):
if status != 'skip':
self.ran += 1
self.last_ran = num
if status == 'fail':
self.failed += 1
self.last_failed = num
if status == 'skip':
f.emit_subtest_result(self, num, mblue('SKIP'), name)
elif status == 'ok':
f.emit_subtest_result(self, num, mgreen('OK'), name)
else:
f.emit_subtest_result(self, num, mred('FAIL'), name)
self.update_subtest_repr(num, status)
f.request_test_update(self)
def text_subtest_repr(self):
"""Textual representation of testcases, used for late plan or
after the run has finished."""
if not self.ran or not self.have_subtests:
return ''
elif self.failed == 0:
return '%d tests passed' % self.ran
else:
return '%d/%d tests passed' % (self.ran - self.failed, self.ran)
def running_subtest_repr(self):
if self.have_subtests:
if self.subtest_repr:
return '[' + ''.join(self.subtest_repr) + ']'
elif self.subtest_width > 20:
return self.text_subtest_repr()
return ''
class TestOutputFormatter(object):
# Do not use single-width characters that become double-width when
# changed to emoji presentation, because terminals make a mess out
# of them.
EmojiOutputs = OutputStrings(' \u2705\ufe0f ', ' \u274c\ufe0f ',
' \u2753\ufe0f ', ' \u2757\ufe0f ', ' \u2700 ', '\u2015')
TextOutputs = OutputStrings(': ', ': ', ': ', ': ', ' 8< ', '-')
def __init__(self, term_width, total_tests, verbose=False, emoji=True):
self.max_digits = len(str(total_tests))
self.num_jobs = 4
self.verbose = verbose
self.term_width = term_width
self.total_tests = total_tests
self.outputs = self.EmojiOutputs if emoji else self.TextOutputs
self.output_end = self.outputs.dash * (self.term_width - 1)
if self.term_width > 20:
self.output_start = \
self.output_end[0:self.term_width // 2 - 2] + self.outputs.scissors + \
self.output_end[self.term_width // 2 + 2:]
else:
self.output_start = "Output:"
# "[53-57/99]" -> max_digits * 3 + 4
# "53/99 xx " -> max_digits * 2 + 1 + extra
self.left_width = max(self.max_digits * 3 + 4, self.max_digits * 2 + 1 + len(self.outputs.ok))
# two separators to the side of the name
# 14 characters for the clock or status, plus one space
# two characters at the right hand side
term_width -= self.left_width + 1 + 1 + 14 + 1 + 2
self.name_width = max(int(term_width * 2 / 3), 50)
self.subtest_width = term_width - self.name_width
self.num_tests = 0
self.progress_test = None
self.running_tests = OrderedDict()
self.stop = False
self.should_erase_line = ''
# Low-level tabular printing
def print(self, line, progress=False):
if progress:
print(self.should_erase_line, line, sep='', end='\r')
self.should_erase_line = '\x1b[K'
else:
print(self.should_erase_line, line, sep='')
self.should_erase_line = ''
def print2(self, left, middle, progress=False):
line = '%-*s %s' % (self.left_width, left, middle)
self.print(line, progress)
def print3(self, left, middle, right, progress=False):
if not progress and len(middle) > self.name_width:
self.print2(left, middle)
left = middle = ''
line = '%-*s %-*s %s' % (self.left_width, left, self.name_width, middle, right)
self.print(line, progress)
def print5(self, left, middle, status, clock, detail):
right = status + ' ' + clock
detail = '(' + detail + ')'
if len(clock) + len(detail) + 1 <= self.subtest_width:
self.print3(left, middle, right + ' ' + detail)
elif len(middle) > self.name_width:
self.print2(left, middle)
self.print3('', detail, right)
else:
self.print3(left, middle, right)
self.print2('', detail)
# High-level printing
def left(self, test):
return '%*d/%*d' % (
self.max_digits, test.number,
self.max_digits, self.total_tests)
def emit_progress(self, test):
left = '[%d-%d/%d]' % (
self.num_tests - len(self.running_tests) + 1,
self.num_tests,
self.total_tests)
seconds = time.time() - test.starting_time
clock = '%d/%d s' % (int(seconds), int(test.timeout))
right = '%-14s %s' % (clock, test.running_subtest_repr())
self.print3(left, test.name, right, progress=True)
def emit_start(self, test):
if self.verbose:
left = self.left(test)
self.print2(left, 'Starting ' + test.name)
self.print2('', mbold(test.cmdline))
def emit_reproducer(self, line):
if not self.verbose or self.num_jobs > 1:
self.print2('', '%s %s' % (mbold('Reproducer:'), line))
def emit_output_start(self):
self.print(mbold(self.output_start))
def emit_output_end(self):
self.print(mbold(self.output_end))
def emit_subtest_result(self, test, num, status, subtest_name):
if not self.verbose and 'FAIL' not in status:
return
left = self.left(test)
main = '%s - %d %s' % (test.name, num, subtest_name)
self.print3(left, main, status)
def emit_result(self, test, result, status, detail):
seconds = '%.2f s' % (time.time() - test.starting_time)
left = self.left(test) + getattr(self.outputs, result)
self.print5(left, test.name, status, seconds, detail)
if result == 'fail':
self.emit_reproducer(test.cmdline)
del self.running_tests[test.name]
if self.progress_test is test:
self.progress_test = None
# Asynchronous progress updates
def finish(self):
self.stop = True
self.request_update()
def request_test_update(self, for_test=None):
if self.progress_test is for_test:
self.request_update()
def request_update(self):
if self.update:
self.update.set()
async def report_progress(self):
loop = asyncio.get_event_loop()
self.update = asyncio.Event()
next_update = 0
self.request_update()
while not self.stop:
await self.update.wait()
self.update.clear()
if loop.time() >= next_update:
self.progress_test = None
next_update = loop.time() + 1
loop.call_at(next_update, self.update.set)
if not self.progress_test:
if not self.running_tests:
continue
key, self.progress_test = self.running_tests.popitem(last=False)
self.running_tests[key] = self.progress_test
self.emit_progress(self.progress_test)
def start_test(self, name, cmdline, timeout):
self.num_tests += 1
test = TestCaseFormatter(
number=self.num_tests, name=name, cmdline=cmdline,
timeout=timeout, subtest_width=self.subtest_width)
if self.verbose:
self.emit_start(test)
self.running_tests[name] = test
self.running_tests.move_to_end(name, last=False)
self.request_update()
return test
async def t1():
t1 = f.start_test('suite1: test1', './test1 --tap -k', 4)
t1.set_subtests(4)
t1.update_subtest(1, 'ok', 'first')
t1.update_subtest(2, 'skip', 'second')
t1.update_subtest(3, 'ok', 'third')
await asyncio.sleep(2.9)
t1.update_subtest(4, 'ok', 'fourth')
f.emit_result(t1, 'ok', mgreen('OK '), t1.text_subtest_repr())
return t1
async def t2():
t2 = f.start_test('suite1: test2', './test2 --tap -k', 30)
t2.set_subtests(40)
for i in range(1, 41):
if i < 35:
if i == 1 or i == 11 or i == 35:
await asyncio.sleep(1)
else:
await asyncio.sleep(0.1)
if i == 20:
t2.update_subtest(i, 'fail', 'here are failing subtests, some with a long name')
elif i == 30:
t2.update_subtest(i, 'fail', 'which are printed even in non-verbose mode')
else:
t2.update_subtest(i, 'ok', f'test {i}')
f.emit_result(t2, 'fail', mred('FAIL '), t2.text_subtest_repr())
async def t3():
t3 = f.start_test('suite1: non-TAP test1', './test3.sh', 10)
await asyncio.sleep(3)
f.emit_result(t3, 'fail', mred('FAIL '), 'exit code 1')
f.emit_output_start()
print('error blah blah')
f.emit_output_end()
async def t4():
t4 = f.start_test('suite1: non-TAP test2', './test4.sh', 10)
await asyncio.sleep(3)
f.emit_result(t4, 'ok', mgreen('EXPECTEDFAIL '), 'exit code 2')
async def tests():
f1 = asyncio.create_task(t1())
await asyncio.sleep(1)
f2 = asyncio.create_task(t2())
await asyncio.sleep(5)
f3 = asyncio.create_task(t3())
await asyncio.sleep(1)
f4 = asyncio.create_task(t4())
await f1
await f2
await f3
await f4
async def run():
formatter = asyncio.create_task(f.report_progress())
await tests()
f.finish()
await formatter
verbose = '-v' in sys.argv
try:
columns, rows = os.get_terminal_size(1)
f = TestOutputFormatter(columns, 4, verbose=verbose, emoji=True)
except OSError:
f = TestOutputFormatter(80, 4, verbose=verbose, emoji=False)
asyncio.run(run())
print()
print()
print('Ok: 1 ')
print('Expected Fail: 1 ')
print('Fail: 2 ')
print('Unexpected Pass: 0 ')
print('Skipped: 0 ')
print('Timeout: 0 ')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment