Created
February 8, 2017 02:11
-
-
Save jhgg/58c0387aac558e1cca9505bf779dfb5b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
raven_utils.flask_sentry | |
~~~~~~~~~~~~~~~~~~~ | |
:copyright: (c) 2010-2012 by the Sentry Team, see AUTHORS for more details. | |
:license: BSD, see LICENSE for more details. | |
""" | |
from __future__ import absolute_import | |
try: | |
from flask_login import current_user | |
except ImportError: | |
has_flask_login = False | |
else: | |
has_flask_login = True | |
import logging | |
from flask import request, current_app | |
from flask.signals import got_request_exception, request_finished | |
from werkzeug.exceptions import ClientDisconnected | |
from raven.conf import setup_logging | |
from raven.base import Client | |
from raven.middleware import Sentry as SentryMiddleware | |
from raven.handlers.logging import SentryHandler | |
from raven.utils.compat import _urlparse | |
from raven.utils.encoding import to_unicode | |
from raven.utils.wsgi import get_headers, get_environ | |
from raven.utils.conf import convert_options | |
def make_client(client_cls, app, dsn=None): | |
return client_cls( | |
**convert_options( | |
app.config, | |
defaults={ | |
'dsn': dsn, | |
'include_paths': ( | |
set(app.config.get('SENTRY_INCLUDE_PATHS', [])) | |
| set([app.import_name]) | |
), | |
# support legacy RAVEN_IGNORE_EXCEPTIONS | |
'ignore_exceptions': [ | |
'{0}.{1}'.format(x.__module__, x.__name__) | |
for x in app.config.get('RAVEN_IGNORE_EXCEPTIONS', []) | |
], | |
'extra': { | |
'app': app, | |
}, | |
}, | |
) | |
) | |
class Sentry(object): | |
""" | |
Flask application for Sentry. | |
Look up configuration from ``os.environ['SENTRY_DSN']``:: | |
>>> sentry = Sentry(app) | |
Pass an arbitrary DSN:: | |
>>> sentry = Sentry(app, dsn='http://public:[email protected]/1') | |
Pass an explicit client:: | |
>>> sentry = Sentry(app, client=client) | |
Automatically configure logging:: | |
>>> sentry = Sentry(app, logging=True, level=logging.ERROR) | |
Capture an exception:: | |
>>> try: | |
>>> 1 / 0 | |
>>> except ZeroDivisionError: | |
>>> sentry.captureException() | |
Capture a message:: | |
>>> sentry.captureMessage('hello, world!') | |
By default, the Flask integration will do the following: | |
- Hook into the `got_request_exception` signal. This can be disabled by | |
passing `register_signal=False`. | |
- Wrap the WSGI application. This can be disabled by passing | |
`wrap_wsgi=False`. | |
- Capture information from Flask-Login (if available). | |
""" | |
# TODO(dcramer): the client isn't using local context and therefore | |
# gets shared by every app that does init on it | |
def __init__(self, app, client=None, client_cls=Client, dsn=None, | |
logging=False, logging_exclusions=None, level=logging.NOTSET, | |
wrap_wsgi=None, register_signal=True): | |
if client and not isinstance(client, Client): | |
raise TypeError('client should be an instance of Client') | |
self.dsn = dsn | |
self.logging = logging | |
self.logging_exclusions = logging_exclusions | |
self.client_cls = client_cls | |
self.client = client | |
self.level = level | |
self.wrap_wsgi = wrap_wsgi | |
self.register_signal = register_signal | |
self.context_hooks = [] | |
self.init_app(app) | |
def add_context_hook(self, hook): | |
self.context_hooks.append(hook) | |
def handle_exception(self, *args, **kwargs): | |
self.captureException(exc_info=kwargs.get('exc_info')) | |
def get_user_info(self, request): | |
""" | |
Requires Flask-Login (https://pypi.python.org/pypi/Flask-Login/) | |
to be installed | |
and setup | |
""" | |
if not has_flask_login: | |
return | |
if not hasattr(current_app, 'login_manager'): | |
return | |
try: | |
is_authenticated = current_user.is_authenticated | |
except AttributeError: | |
# HACK: catch the attribute error thrown by flask-login is not attached | |
# > current_user = LocalProxy(lambda: _request_ctx_stack.top.user) | |
# E AttributeError: 'RequestContext' object has no attribute 'user' | |
return {} | |
if callable(is_authenticated): | |
is_authenticated = is_authenticated() | |
if not is_authenticated: | |
return {} | |
user_info = { | |
'id': current_user.get_id(), | |
} | |
if 'SENTRY_USER_ATTRS' in current_app.config: | |
for attr in current_app.config['SENTRY_USER_ATTRS']: | |
if hasattr(current_user, attr): | |
user_info[attr] = getattr(current_user, attr) | |
return user_info | |
def get_http_info(self, request): | |
""" | |
Determine how to retrieve actual data by using request.mimetype. | |
""" | |
if self.is_json_type(request.mimetype): | |
retriever = self.get_json_data | |
else: | |
retriever = self.get_form_data | |
return self.get_http_info_with_retriever(request, retriever) | |
def is_json_type(self, content_type): | |
return content_type == 'application/json' | |
def get_form_data(self, request): | |
return request.form | |
def get_json_data(self, request): | |
return request.data | |
def get_http_info_with_retriever(self, request, retriever=None): | |
""" | |
Exact method for getting http_info but with form data work around. | |
""" | |
if retriever is None: | |
retriever = self.get_form_data | |
urlparts = _urlparse.urlsplit(request.url) | |
try: | |
data = retriever(request) | |
except ClientDisconnected: | |
data = {} | |
return { | |
'url': '%s://%s%s' % (urlparts.scheme, urlparts.netloc, urlparts.path), | |
'query_string': urlparts.query, | |
'method': request.method, | |
'data': data, | |
'headers': dict(get_headers(request.environ)), | |
'env': dict(get_environ(request.environ)), | |
} | |
def attach_context(self): | |
if getattr(request, '_sentry_context_attached', False): | |
return | |
if request.url_rule: | |
self.client.transaction.push(request.url_rule.rule) | |
try: | |
self.client.http_context(self.get_http_info(request)) | |
except Exception as e: | |
self.client.logger.exception(to_unicode(e)) | |
try: | |
self.client.user_context(self.get_user_info(request)) | |
except Exception as e: | |
self.client.logger.exception(to_unicode(e)) | |
for hook in self.context_hooks: | |
try: | |
hook(request) | |
except Exception as e: | |
self.client.logger.exception(to_unicode(e)) | |
request._sentry_context_attached = True | |
def after_request(self, sender, response, *args, **kwargs): | |
if getattr(request, '_sentry_context_attached', False): | |
self.client.context.clear() | |
if request.url_rule: | |
self.client.transaction.pop(request.url_rule.rule) | |
return response | |
def init_app(self, app, dsn=None, logging=None, level=None, | |
logging_exclusions=None, wrap_wsgi=None, | |
register_signal=None): | |
if dsn is not None: | |
self.dsn = dsn | |
if level is not None: | |
self.level = level | |
if wrap_wsgi is not None: | |
self.wrap_wsgi = wrap_wsgi | |
elif self.wrap_wsgi is None: | |
# Fix https://github.com/getsentry/raven-python/issues/412 | |
# the gist is that we get errors twice in debug mode if we don't do this | |
if app and app.debug: | |
self.wrap_wsgi = False | |
else: | |
self.wrap_wsgi = True | |
if register_signal is not None: | |
self.register_signal = register_signal | |
if logging is not None: | |
self.logging = logging | |
if logging_exclusions is not None: | |
self.logging_exclusions = logging_exclusions | |
if not self.client: | |
self.client = make_client(self.client_cls, app, self.dsn) | |
if self.logging: | |
kwargs = {} | |
if self.logging_exclusions is not None: | |
kwargs['exclude'] = self.logging_exclusions | |
setup_logging(SentryHandler(self.client, level=self.level), **kwargs) | |
if self.wrap_wsgi: | |
app.wsgi_app = SentryMiddleware(app.wsgi_app, self.client) | |
if self.register_signal: | |
got_request_exception.connect(self.handle_exception, sender=app) | |
request_finished.connect(self.after_request, sender=app) | |
if not hasattr(app, 'extensions'): | |
app.extensions = {} | |
app.extensions['sentry'] = self | |
def captureException(self, *args, **kwargs): | |
self.attach_context() | |
result = self.client.captureException(*args, **kwargs) | |
return result | |
def captureMessage(self, *args, **kwargs): | |
self.attach_context() | |
result = self.client.captureMessage(*args, **kwargs) | |
return result | |
def user_context(self, *args, **kwargs): | |
return self.client.user_context(*args, **kwargs) | |
def tags_context(self, *args, **kwargs): | |
return self.client.tags_context(*args, **kwargs) | |
def extra_context(self, *args, **kwargs): | |
return self.client.extra_context(*args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment