Skip to content

Instantly share code, notes, and snippets.

@jzempel
Created July 29, 2012 20:37
Show Gist options
  • Save jzempel/3201722 to your computer and use it in GitHub Desktop.
Save jzempel/3201722 to your computer and use it in GitHub Desktop.
Flask with Celery 3.0
from blueprint import example
from extensions import mail
from flask import Flask
import settings
def create_app(settings=settings):
ret_val = Flask(__name__)
ret_val.config.from_object(settings)
# initialize extensions...
mail.init_app(ret_val)
# register blueprints...
ret_val.register_blueprint(example)
return ret_val
from flask import Blueprint
from tasks import handle_notification
example = Blueprint("example", __name__)
@example.route('/')
def notify():
ret_val = "Hello World!"
handle_notification.apply_async([ret_val])
return ret_val
from __future__ import absolute_import
from application import settings
from celery import Celery
celery = Celery()
celery.config_from_object(settings)
from flask.ext.mail import Mail
mail = Mail()
BROKER_TRANSPORT = "mongodb" # ...or your broker of choice.
DEBUG = True
from celery import celery
from extensions import mail
from flask.ext.mail import Message
@celery.task
def handle_notification(body):
message = Message("Test")
message.body = body
with mail.connect() as connection:
connection.send(message)
from application import create_app
application = create_app()
application.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment