Created
November 12, 2016 19:12
-
-
Save IrSent/5e4820f6b187d3654967b55e27d5d204 to your computer and use it in GitHub Desktop.
Celery Best Practices
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Safe Queue Celery App Settings | |
from __future__ import absolute_import, unicode_literals | |
from celery import Celery | |
app = Celery('some_project', | |
broker='amqp://', | |
backend='amqp://', | |
include=['some_project.tasks']) | |
# Optional configuration, see the application user guide. | |
app.conf.update( | |
result_expires=3600, | |
) | |
app.conf.update( | |
enable_utc=True, | |
timezone='Europe/Kiev', | |
) | |
if __name__ == '__main__': | |
app.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Sentry | |
# By default CELERYD_HIJACK_ROOT_LOGGER = True | |
# Is important variable that allows Celery to overlap other custom logging handlers | |
CELERYD_HIJACK_ROOT_LOGGER = False | |
LOGGING = { | |
'handlers': { | |
'celery_sentry_handler': { | |
'level': 'ERROR', | |
'class': 'core.log.handlers.CelerySentryHandler' | |
} | |
}, | |
'loggers': { | |
'celery': { | |
'handlers': ['celery_sentry_handler'], | |
'level': 'ERROR', | |
'propagate': False, | |
}, | |
} | |
} | |
# ... | |
# | |
# Prioritize your tasks! | |
# | |
CELERY_QUEUES = ( | |
Queue('high', Exchange('high'), routing_key='high'), | |
Queue('normal', Exchange('normal'), routing_key='normal'), | |
Queue('low', Exchange('low'), routing_key='low'), | |
) | |
CELERY_DEFAULT_QUEUE = 'normal' | |
CELERY_DEFAULT_EXCHANGE = 'normal' | |
CELERY_DEFAULT_ROUTING_KEY = 'normal' | |
CELERY_ROUTES = { | |
# -- HIGH PRIORITY QUEUE -- # | |
'myapp.tasks.check_payment_status': {'queue': 'high'}, | |
# -- LOW PRIORITY QUEUE -- # | |
'myapp.tasks.close_session': {'queue': 'low'}, | |
} | |
# Keep result only if you really need them: CELERY_IGNORE_RESULT = False | |
# In all other cases it is better to have place somewhere in db | |
CELERY_IGNORE_RESULT = True | |
# ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Launch your workers | |
celery worker -E -l INFO -n worker.high -Q high | |
celery worker -E -l INFO -n worker.normal -Q normal | |
celery worker -E -l INFO -n worker.low -Q low | |
# This worker will accept tasks if for example all other high queue workers are busy | |
celery worker -E -l INFO -n worker.whatever | |
# Use FLOWER to monitor your Celery app `https://github.com/mher/flower`, `https://flower.readthedocs.io/` | |
$ pip install flower | |
# Launch the server and open http://localhost:5555 | |
$ flower -A some_project --port=5555 | |
# Or, launch from Celery | |
$ celery flower -A proj --address=127.0.0.1 --port=5555 | |
# Broker URL and other configuration options can be passed through the standard Celery options | |
$ celery flower -A proj --broker=amqp://guest:guest@localhost:5672// |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Keep your tasks in tasks.py small and readable | |
# Don't place your business logic in it, just import them here | |
from __future__ import absolute_import, unicode_literals | |
from .celery import app | |
from .utils import generate_report, send_email | |
# BTW bind=True is helpful to restart this task in case it is failed | |
@app.task(bind=True) | |
def send_report(self): | |
filename = generate_report() | |
try: | |
send_email(subject, message, attachments=[filename]) | |
except SomeError as exc: | |
# Retry in 5 minutes. | |
raise self.retry(countdown=60 * 5, exc=exc) | |
# set time limits to avoid worker hangs | |
@app.task(bind=True, soft_time_limit=120, time_limit=180) | |
def some_long_task_with_high_risk_of_failure(): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment