Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created October 9, 2018 19:45
Show Gist options
  • Save craigderington/6a740accb408d32de723536b48abda43 to your computer and use it in GitHub Desktop.
Save craigderington/6a740accb408d32de723536b48abda43 to your computer and use it in GitHub Desktop.
Celery worker for project
from celery import Celery
from celery.schedules import crontab
from app import create_app
from app.tasks import log
def create_celery(app):
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = create_app()
celery = create_celery(flask_app)
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# period task executes every 2.5 seconds
sender.add_periodic_task(48400, log, name='Log Users')
# periodic task executes on crontab schedule
sender.add_periodic_task(
crontab(hour=0, minute=2),
log,
name='Run the log'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment