Skip to content

Instantly share code, notes, and snippets.

@born2discover
Last active October 20, 2020 20:48
Show Gist options
  • Save born2discover/20a827f8a70c88ad0177e32ee9a21102 to your computer and use it in GitHub Desktop.
Save born2discover/20a827f8a70c88ad0177e32ee9a21102 to your computer and use it in GitHub Desktop.
Celery 5.0 with Flask
from flask import Flask
from importlib import import_module
from app.extensions import celery
def create_application(config_path: str = None) -> Flask:
app = Flask(__name__)
app.config.from_pyfile("settings.cfg")
app.config.from_envvar("APP_CONFIG", silent=True)
if config_path is not None:
app.config.from_pyfile(config_path, silent=True)
app.extensions["celery"] = celery
celery.conf.update(app.config["CELERY_CONFIG"])
celery.finalize()
with app.app_context():
import_module("app.tasks")
return app
from celery import Celery
# The autofinalise keyword argument is needed so we can
# call finalise() automatically later in create_application.
celery = Celery(__name__, autofinalize=False)
# Celery
# This section is specific to Celery 5.0
# Earlier version require a different configuration that Celery 5.0 can still read but will throw
# warnings at you. The old configuration is scheduled to be removed in Celery 6.0
# https://docs.celeryproject.org/en/stable/userguide/configuration.html
CELERY_CONFIG={
"broker_url": "redis://localhost:6379/0",
"result_backend": "redis://localhost:6379/1"
}
from celery.utils.log import get_task_logger
from app.extensions import celery
logger = get_task_logger(__name__)
Setting up periodic tasks if required.
@celery.on_after_configure.connect
def schedule_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, test.s(), name="test.repeated@10sec")
@celery.task
def test():
logger.info("Test task run successfully.")
from app import create_application
app = create_application()
app.app_context().push()
celery = app.extensions["celery"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment