Skip to content

Instantly share code, notes, and snippets.

@djlambert
Created May 24, 2019 22:13
Show Gist options
  • Save djlambert/25f8fe798e61631625f1aa14ca87f856 to your computer and use it in GitHub Desktop.
Save djlambert/25f8fe798e61631625f1aa14ca87f856 to your computer and use it in GitHub Desktop.
import logging
from datetime import datetime, timezone
from typing import Any, Dict, Text, Tuple
import billiard.einfo as beinfo
import kombu
import sqlalchemy as sa
from celery import Celery as _Celery, Task as CeleryTask
from celery._state import get_current_task
from celery.utils.log import get_task_logger
app_logger: logging.Logger = logging.getLogger('audit_tools')
app_logger.setLevel(logging.DEBUG)
from conf.app import config
class Celery(_Celery):
def gen_task_name(self, name: Text, module: Text):
if module.startswith('audit_tools.server.task'):
module = '.'.join(module.split('.')[3:])
return super().gen_task_name(name, module)
celery = Celery(__name__)
celery.config_from_object(config, namespace='celery')
celery_logger: logging.Logger = get_task_logger(__name__)
connection: kombu.Connection = kombu.Connection(celery.conf.broker_url)
exchange: kombu.Exchange = kombu.Exchange(name='task_log', type='topic', durable=False)
producer: kombu.Producer = connection.Producer(connection, exchange=exchange)
engine: sa.engine.Engine = sa.create_engine(config.database_url, echo=config.DEBUG_TASK_LOG_DB)
def publish_msg(task_id: Text, body: Dict):
producer.publish(
body,
routing_key=task_id if 'level' not in body else '{}.{}'.format(task_id, body['level']),
retry=True,
exchange=exchange,
declare=[exchange]
)
class TaskLogHandler(logging.Handler):
def emit(self, record: logging.LogRecord):
task: CeleryTask = get_current_task()
if not (task and task.request):
return
message = self.format(record).replace('\00', '')
timestamp = datetime.fromtimestamp(record.created, timezone.utc).isoformat()
log_sql = sa.text('INSERT INTO log (type, task_guid, timestamp, level, message) VALUES (:type, :task_guid, :timestamp, :level, :message)')
engine.execute(log_sql, type='task', task_guid=task.request.id, timestamp=timestamp, level=record.levelno, message=message)
body = {
'class': 'task_log',
'type': 'log',
'name': task.name,
'timestamp': timestamp,
'level': record.levelno,
'message': message,
}
publish_msg(task.request.id, body)
task_log_handler = TaskLogHandler()
logging.getLogger('sqlalchemy.engine').propagate = False
celery_logger.setLevel(logging.DEBUG)
celery_logger.addHandler(task_log_handler)
app_logger.addHandler(task_log_handler)
class LoggingTask(celery.Task):
def __init__(self):
self.logger: logging.Logger = celery_logger
def __call__(self, *args, **kwargs) -> Any:
created = datetime.now(timezone.utc).isoformat()
task_sql = sa.text('INSERT INTO task (guid, name, created, state) VALUES (:guid, :name, :created, :state)')
engine.execute(task_sql, guid=self.request.id, name=self.name, created=created, state='started')
return super().__call__(*args, **kwargs)
def on_success(self, retval: Any, task_id: Text, args: Tuple, kwargs: Dict):
"""
Success handler.
Run by the worker if the task executes successfully.
"""
print('>>>>>> ON_SUCCESS')
def on_retry(self, exc: Exception, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo):
"""
Retry handler.
This is run by the worker when the task is to be retried.
"""
print('>>>>>> ON_RETRY')
def on_failure(self, exc: Exception, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo):
print('>>>>>>>>>>>>>>>>>> FAILURE')
self.logger.critical(f'EXCEPTION: {exc}')
def after_return(self, status: Text, retval: Any, task_id: Text, args: Tuple, kwargs: Dict, einfo: beinfo.ExceptionInfo):
task_sql = sa.text('UPDATE task SET state=:state WHERE guid=:guid')
status = status.lower()
engine.execute(task_sql, guid=task_id, state=status)
self.logger.info(f'Task complete, status: {status}')
publish_msg(task_id, {
'class': 'task_log',
'type': 'state',
'name': self.name,
'timestamp': datetime.now(timezone.utc).isoformat(),
'level': logging.INFO,
'message': status
})
# noinspection PyPropertyAccess
celery.Task = LoggingTask
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment