Skip to content

Instantly share code, notes, and snippets.

@wware
Last active December 13, 2019 18:48
Show Gist options
  • Save wware/e83efe1d0d21674e25265cf1a87f865a to your computer and use it in GitHub Desktop.
Save wware/e83efe1d0d21674e25265cf1a87f865a to your computer and use it in GitHub Desktop.
Decorator to show Python control flow thru a function and the functions it calls
#!/usr/bin/env python
import inspect
import linecache
import pprint
import yaml
import re
import os
import sys
import logging
from functools import wraps
logging.basicConfig(
format='%(asctime)s %(levelname)s %(filename)s:%(lineno)-5d %(message)s',
level=logging.DEBUG
)
def trace_func(level, show_locals=False, show_code=False, only=None):
"""
Don't use this in code that launches new threads.
Example usage:
@trace_func(logging.INFO)
def my_cool_function():
code goes here
if some condition:
more stuff to do
else:
alternative stuff to do
a little more
return some value
:param level: the level at which logging should be done
:param show_locals: show local vbls for each line before line is executed
:param show_code: show the code for each line
:param only: filter function, which source filenames get tracing
:return: a decorator that traces executed Python lines to the root logger
"""
indent_step = 4 * " "
indent_offset = [-1]
new_formatter = logging.Formatter("%(message)s")
if only is None:
frame = inspect.currentframe()
f = os.path.split(inspect.getfile(frame.f_back))[1]
def only_func(fname):
return fname.endswith(f) or fname.endswith(re.sub(r"\.py$", ".pyc", f))
only = only_func
def format_dict(d, indent=None):
if indent is None:
indent = ""
lines = []
for k, v in d.items():
if isinstance(v, argparse.Namespace):
v = v.__dict__
if isinstance(v, dict):
lines.append(indent + str(k) + ":")
lines += format_dict(v, indent + indent_step)
else:
lines.append(indent + str(k) + ": " + str(v))
return lines
def trace_lines(frame, event, arg):
if (
frame is not None and event == 'line' and
logging.getLogger().isEnabledFor(level)
):
co = frame.f_code
if only is not None and not only(co.co_filename):
return
line = locals = ""
depth, f = 0, frame
while f is not None:
depth += 1
f = f.f_back
if indent_offset[0] == -1:
indent_offset[0] = depth
indent = (depth - indent_offset[0]) * 4 * " "
if show_locals:
try:
locals = "\n".join(
format_dict(frame.f_locals, indent + 6 * " ")
) + ("\n" if frame.f_locals else "")
except:
pass
if show_code:
line = ": " + linecache.getline(co.co_filename, frame.f_lineno).rstrip()
old_formatters = []
logger = logging.getLogger()
for handler in logger.handlers:
old_formatters.append(handler.formatter)
handler.setFormatter(new_formatter)
logger.log(level, '{0}{1}{2}:{3}::{4}{5}'.format(
locals,
indent,
os.path.split(co.co_filename)[1],
frame.f_lineno,
co.co_name,
line
))
for handler, formatter in zip(logger.handlers, old_formatters):
handler.setFormatter(formatter)
def trace_calls(frame, event, arg):
if event == 'call':
return trace_lines
def decorator(f):
@wraps(f)
def inner(*args, **kwargs):
orig = sys.gettrace()
sys.settrace(trace_calls)
r = f(*args, **kwargs)
sys.settrace(orig)
return r
return inner
return decorator
def inner():
x = 12
y = 34
z = 56
return x + y + z
def middle():
v = re.sub("A", "B", "ABCD")
u = inner()
v = inner()
return u + v
# @trace_func(logging.INFO, show_locals=True)
# @trace_func(logging.INFO, show_locals=True, show_code=True, only=only_this_file)
@trace_func(logging.INFO, show_locals=True, show_code=True)
# @trace_func(show_locals=True)
def outer():
w = middle()
x = middle()
return w + x
logging.info("Before...")
z = outer()
logging.info("After...")
logging.info(z)
#!/usr/bin/env python
"""
No docs here to speak of.
"""
import argparse
import inspect
import re
import os
import sys
import logging
import textwrap
from contextlib import contextmanager
def make_namespace():
parser = argparse.ArgumentParser()
parser.add_argument(
'-D', '--debug',
action='store_true',
help='enable debugging'
)
parser.add_argument(
'--snooze',
action='store_true',
help='another dumb option'
)
return parser.parse_args(['-D'])
namespace = make_namespace()
def inner():
n = namespace
x = 12
y = 34
u = ['abc', 'def', n]
z = 56
return x + y + z
def middle():
v = re.sub("A", "B", "ABCD")
u = inner()
v = inner()
return u + v
def outer():
w = middle()
x = middle()
return w + x
@contextmanager
def make_trace_func(target=None, debug=False, remote=False):
names = set()
def trace_func(frame, event, arg):
name = []
assert inspect is not None
module = inspect.getmodule(frame)
if module:
name.append(module.__name__)
else:
name.append('__main__')
if 'self' in frame.f_locals:
name.append(frame.f_locals['self'].__class__.__name__)
codename = frame.f_code.co_name
if codename != '<module>': # top level usually
name.append(codename)
fqn = ".".join(name)
names.add(fqn)
if event == 'call':
if fqn == target:
if remote:
from remote_pdb import RemotePdb
RemotePdb("0.0.0.0", 4444).set_trace()
else:
import pdb
pdb.set_trace()
return
return trace_func
earlier = None
try:
if debug or target is not None:
earlier = sys.gettrace()
sys.settrace(trace_func)
yield
finally:
if debug:
L = list(names)
L.sort()
for name in L:
print name
sys.settrace(earlier)
def main():
global target
prog = os.path.basename(__file__).replace(".py", "")
parser = argparse.ArgumentParser(
prog=prog,
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(__doc__.strip().replace('PROG', prog))
)
parser.add_argument(
'-d', '--debug',
action='store_true',
help='turn on debug-level logging'
)
parser.add_argument(
'-r', '--remote',
action='store_true',
help='use RemotePdb (port 4444) instead of PDB'
)
parser.add_argument(
'-T', '--target',
help='fully-qualified name of the targeted function or method'
)
options = parser.parse_args()
logging.basicConfig(
format='%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(message)s',
level=logging.DEBUG if options.debug else logging.INFO
)
with make_trace_func(target=options.target, debug=options.debug, remote=options.remote):
logging.info("Before...")
z = outer()
logging.info("After...")
logging.info(z)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment