Skip to content

Instantly share code, notes, and snippets.

@tochimclaren
Forked from pgrangeiro/celery.py
Created December 24, 2020 21:39
Show Gist options
  • Save tochimclaren/c45cced4d78595dfdb8fb97bc9994001 to your computer and use it in GitHub Desktop.
Save tochimclaren/c45cced4d78595dfdb8fb97bc9994001 to your computer and use it in GitHub Desktop.
Django Celery Beat Example
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from robot.config import ROBOT_CELERY_BEAT_SCHEDULE
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test.settings')
app = Celery('test')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
timezone='America/Sao_Paulo',
enable_utc=True,
)
# Scheduled tasks
app.conf.beat_schedule = ROBOT_CELERY_BEAT_SCHEDULE
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile --output-file devops/local/requirements.txt devops/local/requirements.in
#
pip-tools==1.10.1
amqp==2.2.2 # via kombu
billiard==3.5.0.3 # via celery
git+git://github.com/celery/celery@master #celery
certifi==2017.7.27.1 # via requests
chardet==3.0.4 # via requests
decorator==4.1.2 # via ipython, traitlets
django-celery-beat==1.0.1
django-celery-results==1.0.1
django-cors-headers==2.1.0
django==1.11.6
djangorestframework==3.7.1
freezegun==0.3.9
idna==2.6 # via requests
ipdb==0.10.3
ipython-genutils==0.2.0 # via traitlets
ipython==6.2.1 # via ipdb
jedi==0.11.0 # via ipython
kombu==4.1.0 # via celery
model-mommy==1.4.0
multidict==3.3.0 # via yarl
parso==0.1.0 # via jedi
pexpect==4.2.1 # via ipython
pickleshare==0.7.4 # via ipython
prompt-toolkit==1.0.15 # via ipython
ptyprocess==0.5.2 # via pexpect
pygments==2.2.0 # via ipython
python-dateutil==2.6.1
pytz==2017.2
pyyaml==3.12 # via vcrpy
redis==2.10.6
requests==2.18.4
simplegeneric==0.8.1 # via ipython
six==1.11.0 # via freezegun, model-mommy, prompt-toolkit, python-dateutil, traitlets, vcrpy
traitlets==4.3.2 # via ipython
urllib3==1.22 # via requests
vcrpy==1.11.1
vine==1.1.4 # via amqp
wcwidth==0.1.7 # via prompt-toolkit
wrapt==1.10.11 # via vcrpy
yarl==0.13.0 # via vcrpy
from celery.schedules import crontab
ROBOT_CELERY_BEAT_SCHEDULE = {
'run-all-day': {
'task': 'task_update_quotes',
'schedule': crontab(
minute='*/5',
hour='10-20',
day_of_week='mon,tue,wed,thu,fri',
)
},
'close-tranding-pit': {
'task': 'task_update_quotes',
'schedule': crontab(
minute='0-20/5',
hour=21,
day_of_week='mon,tue,wed,thu,fri',
)
},
'every-night': {
'task': 'task_update_history',
'schedule': crontab(
hour=21,
minute=30,
day_of_week='mon,tue,wed,thu,fri',
),
},
}
from celery.decorators import task
from core.models import TradingPit
from robot.use_cases import (
UpdateCurrencyHistoryData, UpdateQuotesHistoryData, UpdateQuotesDailyHistoryData,
UpdateStockMarketHistoryData, UpdateStockMarketDailyHistoryData,
CalculateXPTOIndex, UpdateXPTOIndexHistoryData,
)
from robot.parsers import QuotesResponseParser, ResponseParser
from cedro.clients import CedroClient
def get_latest_quotes():
client = CedroClient(QuotesResponseParser)
use_case = UpdateQuotesHistoryData(client)
use_case.execute()
def update_today_quotes_history():
use_case = UpdateQuotesDailyHistoryData()
use_case.execute()
def get_stock_market_latest_quotations():
client = CedroClient(QuotesResponseParser)
use_case = UpdateStockMarketHistoryData(client)
use_case.execute()
def update_today_stock_market_history():
use_case = UpdateStockMarketDailyHistoryData()
use_case.execute()
def get_latest_currencies_quotations():
client = CedroClient(ResponseParser)
use_case = UpdateCurrencyHistoryData(client)
use_case.execute()
@task(name='task_update_quotes')
def update_quotes():
client = CedroClient(QuotesResponseParser)
UpdateQuotesHistoryData(client).execute()
UpdateStockMarketHistoryData(client).execute()
client = CedroClient(ResponseParser)
UpdateCurrencyHistoryData(client).execute()
if not TradingPit.objects.is_open():
return
CalculateXPTOIndex().execute()
UpdateXPTOIndexHistoryData().execute()
@task(name='task_update_history')
def update_daily_history():
if not TradingPit.objects.is_open():
return
UpdateQuotesDailyHistoryData().execute()
UpdateStockMarketDailyHistoryData().execute()
"""
Django settings for test project.
Generated by 'django-admin startproject' using Django 1.11.6.
For more information on this file, see
https://docs.djangoproject.com/en/1.11/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.11/ref/settings/
"""
import os
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.11/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'secret'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.environ.get('DEBUG', False)
ALLOWED_HOSTS = []
if 'ALLOWED_HOST_URL' in os.environ:
ALLOWED_HOSTS = [os.environ['ALLOWED_HOST_URL']]
# Application definition
DJANGO_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
THIRD_PARTY_APPS = [
'django_celery_beat',
'django_celery_results',
]
INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'test.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'test.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.11/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
if 'DB_NAME' in os.environ:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': os.environ['DB_NAME'],
'USER': os.environ['DB_USER'],
'PASSWORD': os.environ['DB_PASS'],
'HOST': os.environ['DB_SERVICE'],
'PORT': os.environ['DB_PORT']
}
}
# Password validation
# https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/1.11/topics/i18n/
LANGUAGE_CODE = 'pt-br'
TIME_ZONE = 'America/Sao_Paulo'
USE_I18N = True
USE_L10N = True
USE_TZ = True
DECIMAL_SEPARATOR = ','
THOUSAND_SEPARATOR = '.'
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.11/howto/static-files/
STATIC_URL = '/static/'
# Celery setup
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'America/Sao_Paulo'
CELERY_ENABLE_UTC = True
if 'REDIS_URL' in os.environ:
CELERY_BROKER_URL = os.environ['REDIS_URL']
CELERY_RESULT_BACKEND = os.environ['REDIS_URL']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment