Skip to content

Instantly share code, notes, and snippets.

@tomMoral
Last active February 8, 2022 15:24
Show Gist options
  • Save tomMoral/1e8cb60c1f9e6d06065b11758c2b6b21 to your computer and use it in GitHub Desktop.
Save tomMoral/1e8cb60c1f9e6d06065b11758c2b6b21 to your computer and use it in GitHub Desktop.
Add a coverage logger thread to get access to coverage in real time
from time import sleep
def func0():
print("run func0")
sleep(1)
print("ran func0")
def func1():
print("run func1")
sleep(2.2)
print("ran func1")
def func2():
print("run func2")
sleep(1)
print("ran func2")
def func3():
print("run func3")
sleep(1)
print("ran func3")
def func4():
print("run func3")
sleep(1)
raise Exception()
print("ran func3")
def func5():
if True:
print("Statement")
else:
print("non statement")
def func6():
sleep(4)
print("func6")
def never_run():
print("This code should never be run and if shows coverage")
import os
import socket
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
from multiproc import patch_multiprocessing
import multiprocessing as mp
cov = Coverage(config_file=True, data_suffix=True)
if cov.config.omit is None:
cov.config.omit = []
cov.config.omit += ['*multiproc.py', '*coverage_logger.py']
cov.start()
patch_multiprocessing(cov.config_file)
DEBUG = False
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file`."""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = ".{}.{}".format(socket.gethostname(), os.getpid())
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
if DEBUG:
print(mp.current_process().name, cov.collector.data)
self._collect_and_export()
if self.main:
self.export()
cov.stop()
if not self.main:
self._collect_and_export()
return
print("Main ok")
self.export(new=False)
print("End of the program. I was killed gracefully :)")
import time
import threading
import multiprocessing
from coverage_logger import CoverageLoggerThread
class RunnerProcess(multiprocessing.Process):
def __init__(self, idx, delay):
self.idx = idx
self.delay = delay
super().__init__(name="Proc{}".format(idx))
def run(self):
import basic_func
time.sleep(self.delay)
fname = "func{}".format(self.idx)
getattr(basic_func, fname)()
class RunnerThread(threading.Thread):
def __init__(self, idx, delay):
self.idx = idx
self.delay = delay
super().__init__(name="Proc{}".format(idx))
def run(self):
import basic_func
time.sleep(self.delay)
fname = "func{}".format(self.idx)
getattr(basic_func, fname)()
def last_func():
print("run this last")
def main():
thread_cov = CoverageLoggerThread()
thread_cov.start()
threads = []
for args, cls in [((0, 0.5), RunnerThread), ((1, 2), RunnerProcess),
((2, 2), RunnerProcess), ((2, 4), RunnerProcess),
((3, 1), RunnerProcess), ((4, 1), RunnerProcess),
((5, 1), RunnerProcess), ((6, 1), RunnerProcess)]:
threads += [cls(*args)]
threads[-1].start()
print(threads[-1].name)
for t in threads:
t.join()
last_func()
thread_cov.shutdown()
thread_cov.join()
if __name__ == '__main__':
main()
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
"""Monkey-patching to add multiprocessing support for coverage.py"""
import multiprocessing
import multiprocessing.process
import os
import sys
from coverage.misc import contract
# An attribute that will be set on the module to indicate that it has been
# monkey-patched.
PATCHED_MARKER = "_coverage$patched2"
# The environment variable that specifies the rcfile for subprocesses.
COVERAGE_RCFILE_ENV = "_COVERAGE_RCFILE"
if sys.version_info >= (3, 4):
BaseProcess = multiprocessing.process.BaseProcess
else:
BaseProcess = multiprocessing.Process
original_bootstrap = BaseProcess._bootstrap
class ProcessWithCoverage(BaseProcess):
"""A replacement for multiprocess.Process that starts coverage."""
def _bootstrap(self):
"""Wrapper around _bootstrap to start coverage."""
# avoid circular import
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
try:
return original_bootstrap(self)
finally:
thread_cov.shutdown()
thread_cov.join()
class Stowaway(object):
"""An object to pickle, so when it is unpickled, it can apply
the monkey-patch in the newly created process."""
def __init__(self, rcfile):
self.rcfile = rcfile
def __getstate__(self):
return {'rcfile': self.rcfile}
def __setstate__(self, state):
patch_multiprocessing(state['rcfile'])
@contract(rcfile=str)
def patch_multiprocessing(rcfile):
"""Monkey-patch the multiprocessing module.
This enables coverage measurement of processes started by multiprocessing.
This involves aggressive monkey-patching.
`rcfile` is the path to the rcfile being used.
"""
if hasattr(multiprocessing, PATCHED_MARKER):
return
if sys.version_info >= (3, 4):
BaseProcess._bootstrap = ProcessWithCoverage._bootstrap
else:
multiprocessing.Process = ProcessWithCoverage
# Set the value in ProcessWithCoverage that will be pickled into the child
# process.
os.environ[COVERAGE_RCFILE_ENV] = rcfile
# When spawning processes rather than forking them, we have no state in the
# new process. We sneak in there with a Stowaway: we stuff one of our own
# objects into the data that gets pickled and sent to the sub-process. When
# the Stowaway is unpickled, it's __setstate__ method is called, which
# re-applies the monkey-patch.
# Windows only spawns, so this is needed to keep Windows working.
try:
from multiprocessing import spawn
original_get_preparation_data = spawn.get_preparation_data
except (ImportError, AttributeError):
pass
else:
def get_preparation_data_with_stowaway(name):
"""Get the original preparation data, and also insert our stowaway.
"""
d = original_get_preparation_data(name)
d['stowaway'] = Stowaway(rcfile)
return d
spawn.get_preparation_data = get_preparation_data_with_stowaway
setattr(multiprocessing, PATCHED_MARKER, True)
@tomMoral
Copy link
Author

tomMoral commented Nov 10, 2016

Put the 4 files in one directory and launch python main.py.
This should give you coverage report that updates while the code is run in the different Process/Thread.

Another way to go to avoid the monkey patch is to use a .pth file in your python install that enable coverage on interpreter start:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

you can then start your coverage logger with COVERAGE_LOGGER_START=1 py.test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment