Last active
February 8, 2022 15:24
-
-
Save tomMoral/1e8cb60c1f9e6d06065b11758c2b6b21 to your computer and use it in GitHub Desktop.
Add a coverage logger thread to get access to coverage in real time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
def func0(): | |
print("run func0") | |
sleep(1) | |
print("ran func0") | |
def func1(): | |
print("run func1") | |
sleep(2.2) | |
print("ran func1") | |
def func2(): | |
print("run func2") | |
sleep(1) | |
print("ran func2") | |
def func3(): | |
print("run func3") | |
sleep(1) | |
print("ran func3") | |
def func4(): | |
print("run func3") | |
sleep(1) | |
raise Exception() | |
print("ran func3") | |
def func5(): | |
if True: | |
print("Statement") | |
else: | |
print("non statement") | |
def func6(): | |
sleep(4) | |
print("func6") | |
def never_run(): | |
print("This code should never be run and if shows coverage") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import socket | |
import threading | |
from time import sleep | |
from coverage import Coverage | |
from coverage.data import CoverageData, CoverageDataFiles | |
from coverage.files import abs_file | |
from multiproc import patch_multiprocessing | |
import multiprocessing as mp | |
cov = Coverage(config_file=True, data_suffix=True) | |
if cov.config.omit is None: | |
cov.config.omit = [] | |
cov.config.omit += ['*multiproc.py', '*coverage_logger.py'] | |
cov.start() | |
patch_multiprocessing(cov.config_file) | |
DEBUG = False | |
def get_data_dict(d): | |
"""Return a dict like d, but with keys modified by `abs_file`.""" | |
res = {} | |
keys = list(d.keys()) | |
for k in keys: | |
a = {} | |
lines = list(d[k].keys()) | |
for l in lines: | |
v = d[k].pop(l) | |
a[l] = v | |
res[abs_file(k)] = a | |
return res | |
class CoverageLoggerThread(threading.Thread): | |
_kill_now = False | |
_delay = 2 | |
def __init__(self, main=True): | |
self.main = main | |
self._data = CoverageData() | |
self._fname = cov.config.data_file | |
self._suffix = ".{}.{}".format(socket.gethostname(), os.getpid()) | |
self._data_files = CoverageDataFiles(basename=self._fname, | |
warn=cov._warn) | |
self._pid = os.getpid() | |
super(CoverageLoggerThread, self).__init__() | |
def shutdown(self): | |
self._kill_now = True | |
def combine(self): | |
aliases = None | |
if cov.config.paths: | |
from coverage.aliases import PathAliases | |
aliases = PathAliases() | |
for paths in self.config.paths.values(): | |
result = paths[0] | |
for pattern in paths[1:]: | |
aliases.add(pattern, result) | |
self._data_files.combine_parallel_data(self._data, aliases=aliases) | |
def export(self, new=True): | |
cov_report = cov | |
if new: | |
cov_report = Coverage(config_file=True) | |
cov_report.load() | |
self.combine() | |
self._data_files.write(self._data) | |
cov_report.data.update(self._data) | |
cov_report.html_report(directory="coverage_report_data.html") | |
cov_report.report(show_missing=True) | |
def _collect_and_export(self): | |
new_data = get_data_dict(cov.collector.data) | |
if cov.collector.branch: | |
self._data.add_arcs(new_data) | |
else: | |
self._data.add_lines(new_data) | |
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers)) | |
self._data_files.write(self._data, self._suffix) | |
def run(self): | |
while True: | |
sleep(CoverageLoggerThread._delay) | |
if self._kill_now: | |
break | |
if DEBUG: | |
print(mp.current_process().name, cov.collector.data) | |
self._collect_and_export() | |
if self.main: | |
self.export() | |
cov.stop() | |
if not self.main: | |
self._collect_and_export() | |
return | |
print("Main ok") | |
self.export(new=False) | |
print("End of the program. I was killed gracefully :)") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import threading | |
import multiprocessing | |
from coverage_logger import CoverageLoggerThread | |
class RunnerProcess(multiprocessing.Process): | |
def __init__(self, idx, delay): | |
self.idx = idx | |
self.delay = delay | |
super().__init__(name="Proc{}".format(idx)) | |
def run(self): | |
import basic_func | |
time.sleep(self.delay) | |
fname = "func{}".format(self.idx) | |
getattr(basic_func, fname)() | |
class RunnerThread(threading.Thread): | |
def __init__(self, idx, delay): | |
self.idx = idx | |
self.delay = delay | |
super().__init__(name="Proc{}".format(idx)) | |
def run(self): | |
import basic_func | |
time.sleep(self.delay) | |
fname = "func{}".format(self.idx) | |
getattr(basic_func, fname)() | |
def last_func(): | |
print("run this last") | |
def main(): | |
thread_cov = CoverageLoggerThread() | |
thread_cov.start() | |
threads = [] | |
for args, cls in [((0, 0.5), RunnerThread), ((1, 2), RunnerProcess), | |
((2, 2), RunnerProcess), ((2, 4), RunnerProcess), | |
((3, 1), RunnerProcess), ((4, 1), RunnerProcess), | |
((5, 1), RunnerProcess), ((6, 1), RunnerProcess)]: | |
threads += [cls(*args)] | |
threads[-1].start() | |
print(threads[-1].name) | |
for t in threads: | |
t.join() | |
last_func() | |
thread_cov.shutdown() | |
thread_cov.join() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 | |
# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt | |
"""Monkey-patching to add multiprocessing support for coverage.py""" | |
import multiprocessing | |
import multiprocessing.process | |
import os | |
import sys | |
from coverage.misc import contract | |
# An attribute that will be set on the module to indicate that it has been | |
# monkey-patched. | |
PATCHED_MARKER = "_coverage$patched2" | |
# The environment variable that specifies the rcfile for subprocesses. | |
COVERAGE_RCFILE_ENV = "_COVERAGE_RCFILE" | |
if sys.version_info >= (3, 4): | |
BaseProcess = multiprocessing.process.BaseProcess | |
else: | |
BaseProcess = multiprocessing.Process | |
original_bootstrap = BaseProcess._bootstrap | |
class ProcessWithCoverage(BaseProcess): | |
"""A replacement for multiprocess.Process that starts coverage.""" | |
def _bootstrap(self): | |
"""Wrapper around _bootstrap to start coverage.""" | |
# avoid circular import | |
from coverage_logger import CoverageLoggerThread | |
thread_cov = CoverageLoggerThread(main=False) | |
thread_cov.start() | |
try: | |
return original_bootstrap(self) | |
finally: | |
thread_cov.shutdown() | |
thread_cov.join() | |
class Stowaway(object): | |
"""An object to pickle, so when it is unpickled, it can apply | |
the monkey-patch in the newly created process.""" | |
def __init__(self, rcfile): | |
self.rcfile = rcfile | |
def __getstate__(self): | |
return {'rcfile': self.rcfile} | |
def __setstate__(self, state): | |
patch_multiprocessing(state['rcfile']) | |
@contract(rcfile=str) | |
def patch_multiprocessing(rcfile): | |
"""Monkey-patch the multiprocessing module. | |
This enables coverage measurement of processes started by multiprocessing. | |
This involves aggressive monkey-patching. | |
`rcfile` is the path to the rcfile being used. | |
""" | |
if hasattr(multiprocessing, PATCHED_MARKER): | |
return | |
if sys.version_info >= (3, 4): | |
BaseProcess._bootstrap = ProcessWithCoverage._bootstrap | |
else: | |
multiprocessing.Process = ProcessWithCoverage | |
# Set the value in ProcessWithCoverage that will be pickled into the child | |
# process. | |
os.environ[COVERAGE_RCFILE_ENV] = rcfile | |
# When spawning processes rather than forking them, we have no state in the | |
# new process. We sneak in there with a Stowaway: we stuff one of our own | |
# objects into the data that gets pickled and sent to the sub-process. When | |
# the Stowaway is unpickled, it's __setstate__ method is called, which | |
# re-applies the monkey-patch. | |
# Windows only spawns, so this is needed to keep Windows working. | |
try: | |
from multiprocessing import spawn | |
original_get_preparation_data = spawn.get_preparation_data | |
except (ImportError, AttributeError): | |
pass | |
else: | |
def get_preparation_data_with_stowaway(name): | |
"""Get the original preparation data, and also insert our stowaway. | |
""" | |
d = original_get_preparation_data(name) | |
d['stowaway'] = Stowaway(rcfile) | |
return d | |
spawn.get_preparation_data = get_preparation_data_with_stowaway | |
setattr(multiprocessing, PATCHED_MARKER, True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Put the 4 files in one directory and launch
python main.py
.This should give you coverage report that updates while the code is run in the different Process/Thread.
Another way to go to avoid the monkey patch is to use a
.pth
file in your python install that enable coverage on interpreter start:you can then start your coverage logger with
COVERAGE_LOGGER_START=1 py.test