Last active
February 5, 2021 06:56
-
-
Save FergusInLondon/df8d7c09308fc8891ea1fc889376a1fc to your computer and use it in GitHub Desktop.
Two Python contextmanagers for performance profiling - <100 line
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
profiling.py - standard library backed context managers for performance profiling | |
To use these context managers, ensure that the appropriate environment variable is | |
set - i.e. 'PROFILING_ENABLED'. The default directory for outputting profiling data | |
is the *current directory* - i.e `.` - although this too can be overidden via the | |
environment - specifically `PROFILING_DIRECTORY`. | |
Due to the quirk in the design of `pathlib`, passing an absolute path to one of the | |
context managers will override the output directory associated with that trace. | |
The context managers yield an `Optional[Path]`; when this resolves to a value it means | |
profiling is enabled, and the path is location of the resulting file containing the | |
profiling data - i.e. for use when logging. | |
This is quick'n'dirty, but potentially useful none-the-less. | |
""" | |
from contextlib import contextmanager | |
import cProfile | |
from datetime import datetime | |
from os import getenv | |
from pathlib import Path | |
import tracemalloc | |
PROFILER_DIR_ENV_VAR = "PROFILING_DIRECTORY" | |
PROFILER_OPT_ENV_VAR = "PROFILING_ENABLED" | |
TIMESTAMP_FORMAT = "%Y-%m-%d--%H%M%S%f" | |
DIVIDER = "-" * 80 | |
def gen_filepath(name: str, ext: str) -> Path: | |
return Path( | |
getenv(PROFILER_DIR_ENV_VAR, "."), | |
f"{name} {datetime.now().strftime(TIMESTAMP_FORMAT)}.{ext}".lower() | |
) | |
def write_allocations(output: Path, peak: int, final: int, diff): | |
with open(output, 'w') as out: | |
out.write(f"{DIVIDER}\n") | |
out.write(f"Peak Size:\t {peak} blocks\n") | |
out.write(f"Final Size:\t {final} blocks\n") | |
out.write(f"{DIVIDER}\n\n") | |
out.write(f"Largest Allocations ({len(diff)})\n") | |
out.write(f"{DIVIDER}\n") | |
for stat in diff: | |
out.write(str(stat) + "\n") | |
out.write(f"{DIVIDER}\n") | |
@contextmanager | |
def _disabled(_ = None): | |
yield None | |
@contextmanager | |
def _timing(filename: str = "timing"): | |
""" | |
Use the standard library `cProfile` module to determine the duration and | |
number of function calls. | |
The output is a .prof file, best opened via something like tuna. | |
@see https://github.com/nschloe/tuna | |
""" | |
output_path = gen_filepath(filename, 'prof') | |
pr = cProfile.Profile() | |
pr.enable() | |
yield output_path | |
pr.disable() | |
pr.dump_stats(output_path) | |
@contextmanager | |
def _allocations(filename: str = "allocs"): | |
""" | |
Use the standard library `tracemalloc` module to determine: | |
- Peak memory consumption (vs initial) | |
- Final memory consumption (vs initial) | |
- Largest memory allocations | |
Results will be output as a `.txt` file. | |
""" | |
skip_traces = ( | |
tracemalloc.Filter(False, tracemalloc.__file__), | |
tracemalloc.Filter(False, cProfile.__file__), | |
) | |
output_path = gen_filepath(filename, 'txt') | |
tracemalloc.start() | |
pre_run = tracemalloc.take_snapshot().filter_traces(skip_traces) | |
yield output_path | |
current_size, peak_size = tracemalloc.get_traced_memory() | |
post_run = tracemalloc.take_snapshot().filter_traces(skip_traces) | |
tracemalloc.stop() | |
write_allocations(output_path, | |
peak=peak_size, | |
final=current_size, | |
diff=post_run.compare_to(pre_run, 'lineno') | |
) | |
timing = _timing if getenv(PROFILER_OPT_ENV_VAR) is not None else _disabled | |
allocations = _allocations if getenv(PROFILER_OPT_ENV_VAR) is not None else _disabled | |
# | |
# Everything below here is a painfully contrived example. | |
# | |
if __name__ == "__main__": | |
from random import randint | |
from time import sleep | |
def wasteful(n = None): | |
"""Example function w/ random allocations, durations, and call counts.""" | |
if n is None: | |
n = randint(5, 10) | |
if n > 0: | |
print(f"{n} more iterations") | |
pointless_allocation = sum(list(range(randint(0, 1000000)))) | |
pointless_pause_ms = randint(9, 1000) | |
sleep(pointless_pause_ms / 1000) | |
wasteful(n-1) | |
with allocations() as alloc_out, timing() as time_out: | |
if alloc_out and time_out: | |
print(f"Profiling. Output files: ['{alloc_out}', '{time_out}']") | |
else: | |
print("Not profiling.") | |
wasteful() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment