-
-
Save kination/3442ef67750117bb5c5969d490c6446f to your computer and use it in GitHub Desktop.
send an error email when a Celery worker raises an unhandled exception
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
wakatime.amqp | |
~~~~~~~~~~~~~ | |
Setup for Celery distributed task queue. | |
""" | |
import socket | |
import traceback | |
import uuid | |
from celery import Celery | |
from celery.utils.log import get_task_logger | |
from functools import wraps | |
from kombu.serialization import registry | |
try: | |
from celery import setup_security | |
except ImportError: | |
from celery.security import setup_security | |
from flask import json | |
from wakatime import app, emailer | |
from wakatime.compat import u | |
class RequestMock(object): | |
id = None | |
name = None | |
def __init__(self, id=None, name=None): | |
if id: | |
self.id = id | |
if name: | |
self.name = name | |
class TaskMock(object): | |
fn = None | |
id = None | |
name = None | |
args = () | |
kwargs = {} | |
def __init__(self, *args, **kwargs): | |
self.id = u(uuid.uuid4()) | |
if len(args) == 1 and len(kwargs) == 0: | |
self.fn = args[0] | |
self.name = args[0].__name__ | |
else: | |
self.args = args | |
self.kwargs = kwargs | |
def delay(self, *args, **kwargs): | |
result = ResultMock(id=self.id, name=self.name) | |
args, kwargs = self._jsonify_args(*args, **kwargs) | |
if self.kwargs.get('bind'): | |
result.result = self.fn(self, *args, **kwargs) | |
else: | |
result.result = self.fn(*args, **kwargs) | |
return result | |
def apply_async(self, args=None, kwargs=None, *meta_args, **meta_kwargs): | |
if not isinstance(args, list): | |
args = [] | |
if not isinstance(kwargs, dict): | |
kwargs = {} | |
return self.delay(*args, **kwargs) | |
def _jsonify_args(self, *args, **kwargs): | |
for key, val in kwargs.items(): | |
kwargs[key] = json.loads(json.dumps(val)) | |
args = tuple([json.loads(json.dumps(arg)) for arg in args]) | |
return args, kwargs | |
@property | |
def request(self): | |
return RequestMock(id=self.id, name=self.name) | |
def __call__(self, *args, **kwargs): | |
if not self.fn: | |
task = TaskMock(args[0]) | |
task.args = self.args | |
task.kwargs = self.kwargs | |
return task | |
else: | |
result = ResultMock(self.id) | |
result.result = self.fn(*args, **kwargs) | |
return result | |
class ResultMock(object): | |
id = None | |
name = None | |
result = None | |
def __init__(self, id=None, name=None): | |
if id: | |
self.id = id | |
if name: | |
self.name = name | |
def ready(self): | |
return True | |
def failed(self): | |
return False | |
def forget(self): | |
return | |
def get(self, *args, **kwargs): | |
return self.result | |
def wait(self, *args, **kwargs): | |
return self.result | |
@property | |
def info(self): | |
return self.result | |
class CeleryMock(object): | |
def __init__(self): | |
self.task = TaskMock | |
# Use our custom serializer from wakatime.json | |
registry.register('json', json.dumps, json.loads, | |
content_type='application/json', | |
content_encoding='utf-8') | |
def make_celery(app): | |
if app.config['MOCK_CELERY']: | |
celery = CeleryMock() | |
else: | |
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL']) | |
celery.conf.update(app.config) | |
if app.config['BROKER_USE_SSL']: | |
setup_security() | |
TaskBase = celery.Task | |
class ContextTask(TaskBase): | |
abstract = True | |
def __call__(self, *args, **kwargs): | |
with app.app_context(): | |
return TaskBase.__call__(self, *args, **kwargs) | |
celery.Task = ContextTask | |
return celery | |
def email_if_fails(fn): | |
@wraps(fn) | |
def decorated(*args, **kwargs): | |
try: | |
return fn(*args, **kwargs) | |
except: | |
#db.session.rollback() | |
if not app.config['DEBUG']: | |
try: | |
fnName = fn.func_name | |
except AttributeError: | |
fnName = fn.__name__ | |
send_error_email(fnName, args, kwargs, socket.gethostname(), | |
traceback.format_exc()) | |
raise | |
return decorated | |
def send_error_email(fnName, args, kwargs, host, formatted_exc): | |
formatted_exc = formatted_exc.strip() | |
template = 'noop' | |
contents = u('Task: {fnName}\nArgs: {args}\nKwargs: {kwargs}\n' + \ | |
'Host: {host}\nError: {error}') | |
template_vars = { | |
'contents': contents.format( | |
fnName=fnName, | |
args=args, | |
kwargs=kwargs, | |
host=host, | |
error=formatted_exc, | |
), | |
} | |
short_exc = formatted_exc.rsplit('\n')[-1] | |
subject = '[celery-error] {host} {fnName} {short_exc}'.format( | |
host=host, | |
fnName=fnName, | |
short_exc=short_exc, | |
) | |
emailer.send_email(subject=subject, to=app.config['ADMIN_EMAIL'], | |
template=template, template_vars=template_vars) | |
celery = make_celery(app) | |
logger = get_task_logger(__name__) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
wakatime.tasks | |
~~~~~~~~~~~~~~ | |
Background celery worker tasks. | |
""" | |
from wakatime.amqp import celery, email_if_fails | |
@celery.task | |
@email_if_fails | |
def my_task(): | |
# do some background task | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment