Last active
December 13, 2019 18:48
-
-
Save wware/e83efe1d0d21674e25265cf1a87f865a to your computer and use it in GitHub Desktop.
Decorator to show Python control flow thru a function and the functions it calls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import inspect | |
import linecache | |
import pprint | |
import yaml | |
import re | |
import os | |
import sys | |
import logging | |
from functools import wraps | |
logging.basicConfig( | |
format='%(asctime)s %(levelname)s %(filename)s:%(lineno)-5d %(message)s', | |
level=logging.DEBUG | |
) | |
def trace_func(level, show_locals=False, show_code=False, only=None): | |
""" | |
Don't use this in code that launches new threads. | |
Example usage: | |
@trace_func(logging.INFO) | |
def my_cool_function(): | |
code goes here | |
if some condition: | |
more stuff to do | |
else: | |
alternative stuff to do | |
a little more | |
return some value | |
:param level: the level at which logging should be done | |
:param show_locals: show local vbls for each line before line is executed | |
:param show_code: show the code for each line | |
:param only: filter function, which source filenames get tracing | |
:return: a decorator that traces executed Python lines to the root logger | |
""" | |
indent_step = 4 * " " | |
indent_offset = [-1] | |
new_formatter = logging.Formatter("%(message)s") | |
if only is None: | |
frame = inspect.currentframe() | |
f = os.path.split(inspect.getfile(frame.f_back))[1] | |
def only_func(fname): | |
return fname.endswith(f) or fname.endswith(re.sub(r"\.py$", ".pyc", f)) | |
only = only_func | |
def format_dict(d, indent=None): | |
if indent is None: | |
indent = "" | |
lines = [] | |
for k, v in d.items(): | |
if isinstance(v, argparse.Namespace): | |
v = v.__dict__ | |
if isinstance(v, dict): | |
lines.append(indent + str(k) + ":") | |
lines += format_dict(v, indent + indent_step) | |
else: | |
lines.append(indent + str(k) + ": " + str(v)) | |
return lines | |
def trace_lines(frame, event, arg): | |
if ( | |
frame is not None and event == 'line' and | |
logging.getLogger().isEnabledFor(level) | |
): | |
co = frame.f_code | |
if only is not None and not only(co.co_filename): | |
return | |
line = locals = "" | |
depth, f = 0, frame | |
while f is not None: | |
depth += 1 | |
f = f.f_back | |
if indent_offset[0] == -1: | |
indent_offset[0] = depth | |
indent = (depth - indent_offset[0]) * 4 * " " | |
if show_locals: | |
try: | |
locals = "\n".join( | |
format_dict(frame.f_locals, indent + 6 * " ") | |
) + ("\n" if frame.f_locals else "") | |
except: | |
pass | |
if show_code: | |
line = ": " + linecache.getline(co.co_filename, frame.f_lineno).rstrip() | |
old_formatters = [] | |
logger = logging.getLogger() | |
for handler in logger.handlers: | |
old_formatters.append(handler.formatter) | |
handler.setFormatter(new_formatter) | |
logger.log(level, '{0}{1}{2}:{3}::{4}{5}'.format( | |
locals, | |
indent, | |
os.path.split(co.co_filename)[1], | |
frame.f_lineno, | |
co.co_name, | |
line | |
)) | |
for handler, formatter in zip(logger.handlers, old_formatters): | |
handler.setFormatter(formatter) | |
def trace_calls(frame, event, arg): | |
if event == 'call': | |
return trace_lines | |
def decorator(f): | |
@wraps(f) | |
def inner(*args, **kwargs): | |
orig = sys.gettrace() | |
sys.settrace(trace_calls) | |
r = f(*args, **kwargs) | |
sys.settrace(orig) | |
return r | |
return inner | |
return decorator | |
def inner(): | |
x = 12 | |
y = 34 | |
z = 56 | |
return x + y + z | |
def middle(): | |
v = re.sub("A", "B", "ABCD") | |
u = inner() | |
v = inner() | |
return u + v | |
# @trace_func(logging.INFO, show_locals=True) | |
# @trace_func(logging.INFO, show_locals=True, show_code=True, only=only_this_file) | |
@trace_func(logging.INFO, show_locals=True, show_code=True) | |
# @trace_func(show_locals=True) | |
def outer(): | |
w = middle() | |
x = middle() | |
return w + x | |
logging.info("Before...") | |
z = outer() | |
logging.info("After...") | |
logging.info(z) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
No docs here to speak of. | |
""" | |
import argparse | |
import inspect | |
import re | |
import os | |
import sys | |
import logging | |
import textwrap | |
from contextlib import contextmanager | |
def make_namespace(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-D', '--debug', | |
action='store_true', | |
help='enable debugging' | |
) | |
parser.add_argument( | |
'--snooze', | |
action='store_true', | |
help='another dumb option' | |
) | |
return parser.parse_args(['-D']) | |
namespace = make_namespace() | |
def inner(): | |
n = namespace | |
x = 12 | |
y = 34 | |
u = ['abc', 'def', n] | |
z = 56 | |
return x + y + z | |
def middle(): | |
v = re.sub("A", "B", "ABCD") | |
u = inner() | |
v = inner() | |
return u + v | |
def outer(): | |
w = middle() | |
x = middle() | |
return w + x | |
@contextmanager | |
def make_trace_func(target=None, debug=False, remote=False): | |
names = set() | |
def trace_func(frame, event, arg): | |
name = [] | |
assert inspect is not None | |
module = inspect.getmodule(frame) | |
if module: | |
name.append(module.__name__) | |
else: | |
name.append('__main__') | |
if 'self' in frame.f_locals: | |
name.append(frame.f_locals['self'].__class__.__name__) | |
codename = frame.f_code.co_name | |
if codename != '<module>': # top level usually | |
name.append(codename) | |
fqn = ".".join(name) | |
names.add(fqn) | |
if event == 'call': | |
if fqn == target: | |
if remote: | |
from remote_pdb import RemotePdb | |
RemotePdb("0.0.0.0", 4444).set_trace() | |
else: | |
import pdb | |
pdb.set_trace() | |
return | |
return trace_func | |
earlier = None | |
try: | |
if debug or target is not None: | |
earlier = sys.gettrace() | |
sys.settrace(trace_func) | |
yield | |
finally: | |
if debug: | |
L = list(names) | |
L.sort() | |
for name in L: | |
print name | |
sys.settrace(earlier) | |
def main(): | |
global target | |
prog = os.path.basename(__file__).replace(".py", "") | |
parser = argparse.ArgumentParser( | |
prog=prog, | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
description=textwrap.dedent(__doc__.strip().replace('PROG', prog)) | |
) | |
parser.add_argument( | |
'-d', '--debug', | |
action='store_true', | |
help='turn on debug-level logging' | |
) | |
parser.add_argument( | |
'-r', '--remote', | |
action='store_true', | |
help='use RemotePdb (port 4444) instead of PDB' | |
) | |
parser.add_argument( | |
'-T', '--target', | |
help='fully-qualified name of the targeted function or method' | |
) | |
options = parser.parse_args() | |
logging.basicConfig( | |
format='%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(message)s', | |
level=logging.DEBUG if options.debug else logging.INFO | |
) | |
with make_trace_func(target=options.target, debug=options.debug, remote=options.remote): | |
logging.info("Before...") | |
z = outer() | |
logging.info("After...") | |
logging.info(z) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment