Created
May 24, 2019 22:13
-
-
Save djlambert/25f8fe798e61631625f1aa14ca87f856 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from datetime import datetime, timezone | |
from typing import Any, Dict, Text, Tuple | |
import billiard.einfo as beinfo | |
import kombu | |
import sqlalchemy as sa | |
from celery import Celery as _Celery, Task as CeleryTask | |
from celery._state import get_current_task | |
from celery.utils.log import get_task_logger | |
app_logger: logging.Logger = logging.getLogger('audit_tools') | |
app_logger.setLevel(logging.DEBUG) | |
from conf.app import config | |
class Celery(_Celery): | |
def gen_task_name(self, name: Text, module: Text): | |
if module.startswith('audit_tools.server.task'): | |
module = '.'.join(module.split('.')[3:]) | |
return super().gen_task_name(name, module) | |
celery = Celery(__name__) | |
celery.config_from_object(config, namespace='celery') | |
celery_logger: logging.Logger = get_task_logger(__name__) | |
connection: kombu.Connection = kombu.Connection(celery.conf.broker_url) | |
exchange: kombu.Exchange = kombu.Exchange(name='task_log', type='topic', durable=False) | |
producer: kombu.Producer = connection.Producer(connection, exchange=exchange) | |
engine: sa.engine.Engine = sa.create_engine(config.database_url, echo=config.DEBUG_TASK_LOG_DB) | |
def publish_msg(task_id: Text, body: Dict): | |
producer.publish( | |
body, | |
routing_key=task_id if 'level' not in body else '{}.{}'.format(task_id, body['level']), | |
retry=True, | |
exchange=exchange, | |
declare=[exchange] | |
) | |
class TaskLogHandler(logging.Handler): | |
def emit(self, record: logging.LogRecord): | |
task: CeleryTask = get_current_task() | |
if not (task and task.request): | |
return | |
message = self.format(record).replace('\00', '') | |
timestamp = datetime.fromtimestamp(record.created, timezone.utc).isoformat() | |
log_sql = sa.text('INSERT INTO log (type, task_guid, timestamp, level, message) VALUES (:type, :task_guid, :timestamp, :level, :message)') | |
engine.execute(log_sql, type='task', task_guid=task.request.id, timestamp=timestamp, level=record.levelno, message=message) | |
body = { | |
'class': 'task_log', | |
'type': 'log', | |
'name': task.name, | |
'timestamp': timestamp, | |
'level': record.levelno, | |
'message': message, | |
} | |
publish_msg(task.request.id, body) | |
task_log_handler = TaskLogHandler() | |
logging.getLogger('sqlalchemy.engine').propagate = False | |
celery_logger.setLevel(logging.DEBUG) | |
celery_logger.addHandler(task_log_handler) | |
app_logger.addHandler(task_log_handler) | |
class LoggingTask(celery.Task): | |
def __init__(self): | |
self.logger: logging.Logger = celery_logger | |
def __call__(self, *args, **kwargs) -> Any: | |
created = datetime.now(timezone.utc).isoformat() | |
task_sql = sa.text('INSERT INTO task (guid, name, created, state) VALUES (:guid, :name, :created, :state)') | |
engine.execute(task_sql, guid=self.request.id, name=self.name, created=created, state='started') | |
return super().__call__(*args, **kwargs) | |
def on_success(self, retval: Any, task_id: Text, args: Tuple, kwargs: Dict): | |
""" | |
Success handler. | |
Run by the worker if the task executes successfully. | |
""" | |
print('>>>>>> ON_SUCCESS') | |
def on_retry(self, exc: Exception, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo): | |
""" | |
Retry handler. | |
This is run by the worker when the task is to be retried. | |
""" | |
print('>>>>>> ON_RETRY') | |
def on_failure(self, exc: Exception, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo): | |
print('>>>>>>>>>>>>>>>>>> FAILURE') | |
self.logger.critical(f'EXCEPTION: {exc}') | |
def after_return(self, status: Text, retval: Any, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo): | |
task_sql = sa.text('UPDATE task SET state=:state WHERE guid=:guid') | |
status = status.lower() | |
engine.execute(task_sql, guid=task_id, state=status) | |
self.logger.info(f'Task complete, status: {status}') | |
publish_msg(task_id, { | |
'class': 'task_log', | |
'type': 'state', | |
'name': self.name, | |
'timestamp': datetime.now(timezone.utc).isoformat(), | |
'level': logging.INFO, | |
'message': status | |
}) | |
# noinspection PyPropertyAccess | |
celery.Task = LoggingTask | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment