Skip to content

Instantly share code, notes, and snippets.

@okomestudio
Created April 3, 2015 23:50
Show Gist options
  • Save okomestudio/83fb9fd80c72bb2c9d7c to your computer and use it in GitHub Desktop.
Save okomestudio/83fb9fd80c72bb2c9d7c to your computer and use it in GitHub Desktop.
check_value decorator -- catches and logs an exception happening within a decorated function or method
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import functools
import inspect
from itertools import izip_longest
import json
import logging
import os
import sys
from traceback import format_tb
class _JSONMessage(dict):
def __str__(self):
return json.dumps(self)
def check_value(retval_type, retval_on_failure=None, logger=None,
loglevel=logging.WARNING, stack_trace=True):
"""Check return value and log anomaly.
Catch exceptions within a decorated function or method. Also
checks the return value of decorated function is of (sub-)type
specified.
When exception is raised within a decorated function or method,
the exception will be logged but ignored, and the supplied retval
will be returned on failure. If a logger is not supplied, the
exception will be re-raised (TS: Is this behavior desirable?).
Parameters
----------
retval_type : type
Type of returned value.
retval_on_failure : retval_type
Return value on failure (default: None).
logger : logging.Logger
Logger used for logging. If None, the exception will be
re-raised (default).
loglevel : int
Level used for logger.info (default: logging.WARNING).
stack_trace : bool
True for the full inclusion of stack trace to log message
(default: True).
"""
def decorator(f):
@functools.wraps(f)
def _f(*args, **kwargs):
try:
v = f(*args, **kwargs)
# NOTE: this type checking may be removed if we
# do unit testing down the road.
if isinstance(v, retval_type):
return v
else:
raise ValueError(u'return value must be of type {}'
.format(retval_type.__name__))
except Exception:
err3 = sys.exc_info()
if logger:
func_name = f.func_name
m = _JSONMessage(
file=os.path.abspath(inspect.getsourcefile(f)),
function=func_name,
error=repr(err3[1]),
)
# recover inputs to include
argnames, _, _, defaults = inspect.getargspec(f)
defaults = [] if defaults is None else defaults
inputs = []
for k, v in izip_longest(argnames[::-1], defaults[::-1]):
inputs.append([k, v])
inputs.reverse()
for i, arg in enumerate(args):
inputs[i][1] = arg
inputs = dict([(k, v) for k, v in inputs])
for k, v in kwargs.iteritems():
inputs[k] = v
m['inputs'] = repr(inputs)
if stack_trace:
# skip the first entry unless exception is
# raised within decorator. skipping might not
# be desirable.
tb = err3[2]
m['traceback'] = ''.join(
format_tb(tb.tb_next if tb.tb_next else tb)
)
logger.log(loglevel, m)
else:
raise err3[0], err3[1], err3[2]
return retval_on_failure
return _f
return decorator
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
def square(x):
return x * x
def sqrt(x):
return x**0.5
def unicode_text(s):
return s.decode('utf-8')
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import logging
import sys
from decorators import check_value
import somelibrary as lib
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%m/%d/%Y %H:%M:%S')
ch = logging.StreamHandler(sys.stderr)
ch.setFormatter(formatter)
logger.addHandler(ch)
ch = logging.FileHandler('test.log')
ch.setFormatter(formatter)
logger.addHandler(ch)
another_logger = logging.getLogger('another_logger')
another_logger.setLevel(logging.DEBUG)
ch = logging.FileHandler('another_logger.log')
ch.setFormatter(logging.Formatter(fmt='%(message)s'))
another_logger.addHandler(ch)
class SomeClass(object):
def __init__(self, x):
self.x = x
@staticmethod
@check_value(str, '__BAD__', logger)
def concatenate(x, y):
return x + y
@classmethod
@check_value(int, -9999, logger)
def add_one(cls, x):
return x + 1
def square(self):
@check_value(int, -9999, logger)
def _square(x):
return lib.square(x)
return _square(self.x)
@check_value(float, -9999., logger)
def sqrt(x):
return lib.sqrt(x)
@check_value(unicode, '__BAD__', logger)
def unicode_text(s, keyword=3, testval=u'ふあっくとあっぷ'):
return lib.unicode_text(s)
for i, x in enumerate([3, '4', 10, '日本語', u'日本語']):
obj = SomeClass(x)
print i, (
obj.add_one(x),
obj.square(),
sqrt(x),
unicode_text(x, keyword=2),
SomeClass.concatenate(u'こん', 'にちは'),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment