Last active
October 20, 2020 20:48
-
-
Save born2discover/20a827f8a70c88ad0177e32ee9a21102 to your computer and use it in GitHub Desktop.
Celery 5.0 with Flask
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask | |
from importlib import import_module | |
from app.extensions import celery | |
def create_application(config_path: str = None) -> Flask: | |
app = Flask(__name__) | |
app.config.from_pyfile("settings.cfg") | |
app.config.from_envvar("APP_CONFIG", silent=True) | |
if config_path is not None: | |
app.config.from_pyfile(config_path, silent=True) | |
app.extensions["celery"] = celery | |
celery.conf.update(app.config["CELERY_CONFIG"]) | |
celery.finalize() | |
with app.app_context(): | |
import_module("app.tasks") | |
return app |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
# The autofinalise keyword argument is needed so we can | |
# call finalise() automatically later in create_application. | |
celery = Celery(__name__, autofinalize=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Celery | |
# This section is specific to Celery 5.0 | |
# Earlier version require a different configuration that Celery 5.0 can still read but will throw | |
# warnings at you. The old configuration is scheduled to be removed in Celery 6.0 | |
# https://docs.celeryproject.org/en/stable/userguide/configuration.html | |
CELERY_CONFIG={ | |
"broker_url": "redis://localhost:6379/0", | |
"result_backend": "redis://localhost:6379/1" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery.utils.log import get_task_logger | |
from app.extensions import celery | |
logger = get_task_logger(__name__) | |
Setting up periodic tasks if required. | |
@celery.on_after_configure.connect | |
def schedule_periodic_tasks(sender, **kwargs): | |
sender.add_periodic_task(10.0, test.s(), name="test.repeated@10sec") | |
@celery.task | |
def test(): | |
logger.info("Test task run successfully.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from app import create_application | |
app = create_application() | |
app.app_context().push() | |
celery = app.extensions["celery"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment