python manage.py makemigrations --dry-run --verbosity 3 let's you look at what the migration will actually look like without creating the migration file
Celery is a tool that helps you manage tasks that should occur outside the request/response cycle. Any task that takes more than half a second is a great candidate for turning into a Celery task. Celery is especially helpful for transforming blocking transactions on your site into non-blocking transactions. Celery can help by offloading that work to different tasks.
You can use Celery to send email, update your database with side effects from the request that was just processed, query an API and store the result, and a lot more. Another thing Celery is helpful for is scheduling tasks to run at specific times. You might be familiar with cron jobs, which are tasks that run at specific intervals you define.
requirements.txt
celery==4.2.1
redis==2.10.6
celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py
from celery import shared_task
@shared_task
def hello():
print(“Hello there!”)
@app.task(autoretry_for=(TapiocaException,), retry_backoff=True)
def likes_do_facebook(self):
api = Facebook(access_token=ACCESS_TOKEN)
api.user_likes(id='me').get()
In proj/__init__.py
, add the following:
from .celery import app as celery_app
__all__ = ['celery_app']
To test that your hello()
task works, you can run it locally as a regular Python function. Start a Python shell using docker-compose run web ./manage.py shell
. Run:
>>> from app.tasks import hello
>>> hello()
Hello there!
If you would like to test running your task as a Celery task, run:
>>> hello.delay()
<AsyncResult: ba845cf3-e60b-4432-a9d8-9943621cb8a0>
We will use a feature called Celery beat to schedule our task to run periodically. Celery beat is the Celery scheduler. It executes tasks as often as you tell it to.
settings.py
from celery.schedules import crontab
CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'redis://redis:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULE = {
'hello': {
'task': 'app.tasks.hello',
'schedule': crontab() # execute every minute
}
}
docker-compose.yml
version: '3'
services:
db:
image: postgres:9.6.5
volumes:
- postgres_data:/var/lib/postgresql/data/
redis:
image: "redis:alpine"
web:
build: .
command: bash -c "python /code/manage.py migrate --noinput && python /code/manage.py runserver 0.0.0.0:8000"
volumes:
- .:/code
ports:
- "8000:8000"
depends_on:
- db
- redis
celery:
build: .
command: celery -A proj worker -l info
volumes:
- .:/code
depends_on:
- db
- redis
celery-beat:
build: .
command: celery -A proj beat -l info
volumes:
- .:/code
depends_on:
- db
- redis
volumes:
postgres_data:
You can use flower to monitor all your celery workers.
Installing flower with pip is simple
$ pip install flower
Launch the server and open http://localhost:5555
$ flower -A proj --port=5555
Or, launch from Celery
$ celery flower -A proj --address=127.0.0.1 --port=5555
Broker URL and other configuration options can be passed through the standard Celery options
$ celery flower -A proj --broker=amqp://guest:guest@localhost:5672//
- Do not use complex objects in task as parameters. E.g.: Avoid Django model objects:
# Good
@app.task
def my_task(user_id):
user = User.objects.get(id=user_id)
print(user.name)
# ...
# Bad
@app.task
def my_task(user):
print(user.name)
# ...
- Do not wait for other tasks inside a task.
- Prefer idempotent tasks:
"Idempotence is the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application." - Wikipedia. 4. Prefer atomic tasks:
"An operation (or set of operations) is atomic ... if it appears to the rest of the system to occur instantaneously. Atomicity is a guarantee of isolation from concurrent processes. Additionally, atomic operations commonly have a succeed-or-fail definition—they either successfully change the state of the system, or have no apparent effect." - Wikipedia.
5. Retry when possible. But make sure tasks are idempotent and atomic before doing so.
6 Set retry_limit
to avoid broken tasks to keep retrying forever.
- Exponentially backoff if things look like they are not going to get fixed soon. Throw in a random factor to avoid cluttering services:
def exponential_backoff(task_self):
minutes = task_self.default_retry_delay / 60
rand = random.uniform(minutes, minutes * 1.3)
return int(rand ** task_self.request.retries) * 60
# in the task
raise self.retry(exc=e, countdown=exponential_backoff(self))
-
Use
autoretry_for
to reduce the boilerplate code for retrying tasks. -
Use
retry_backoff
to reduce the boilerplate code when doing exponention backoff. -
For tasks that require high level of reliability, use
acks_late
in combination with retry . Again, make sure tasks are idempotent and atomic. -
Set hard and soft time limits. Recover gracefully if things take longer than expected:
from celery.exceptions import SoftTimeLimitExceeded
@app.task(task_time_limit=60, task_soft_time_limit=45) def my_task(): try: something_possibly_long() except SoftTimeLimitExceeded: recover() Use multiple queues to have more control over throughput and make things more scalable. (Routing Tasks)
Extend the base task class to define default behaviour. (Custom Task Classes)