Created
June 15, 2022 05:04
-
-
Save cmabastar/ece28709d7ffb01e8a788039f01baf9d to your computer and use it in GitHub Desktop.
Flask Celery 5 Application Factory Pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask | |
from app.extensions import * | |
CELERY_CONFIG = { | |
"broker_url": "redis://localhost:6379/1", | |
"result_backend": "redis://localhost:6379/0", | |
} | |
def create_app() | |
app = Flask('name') | |
app.config.update('CELERY_CONFIG' : CELERY_CONFIG) | |
cq.init_app(app) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from app.app import create_app | |
from app.extensions import FlaskCelery | |
app = create_app() | |
celery = FlaskCelery() | |
celery.init_app(app) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FlaskCelery(Celery): | |
def __init__(self, *args, **kwargs): | |
super(FlaskCelery, self).__init__(*args, **kwargs) | |
if "app" in kwargs: | |
self.init_app(kwargs["app"]) | |
def init_app(self, app): | |
self.app = app | |
self.conf.update(app.config["CELERY_CONFIG"]) | |
Task = self.Task | |
class ContextTask(Task): | |
def __call__(self, *args, **kwargs): | |
with app.app_context(): | |
return self.run(*args, **kwargs) | |
self.Task = ContextTask | |
cq = FlaskCelery() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Flask and Celery 5x has changed significantly over the years. And the remaining tutorial are mostly Celery3x. Here's a guide for flask app factory pattern for bigger applications.
To run the worker: