Skip to content

Instantly share code, notes, and snippets.

@malderete
Last active June 27, 2019 09:07
Show Gist options
  • Save malderete/40ffa8b8f16431f55ca8 to your computer and use it in GitHub Desktop.
Save malderete/40ffa8b8f16431f55ca8 to your computer and use it in GitHub Desktop.
PyConAr 2014
"""
Presentacion de la problematica
envio bloqueante de emails en Django
"""
def register_user(request):
if request.method == 'POST':
form = RegisterUserForm(request.POST)
if form.is_valid():
user = form.save()
# send the email!!!
send_registration_email(user)
return redirect('main_page')
else:
form = RegisterUserForm()
ctx = {'form': form}
return render(request, 'auth/registration.html', ctx)
"""
Hardcore way!
App ---> Queue ---> Custom Process (Worker)
Producer!!
"""
def send_registration_email(user):
redis_conn = RedisManager.get_redis_connection()
message = {
'cmd': 'send',
'args': {
'user_id': user.pk,
'to': [user.email],
'subject': 'Gracias por registrarse',
'text': 'Super mensaje'
}
}
json_message = json.dumps(message)
redis_conn.lpush('emails', json_message)
"""
Hardcore way!
App ---> Queue ---> Custom Process
Producer (Worker)!!
"""
def process(self, queue_name, sleep_time=60):
redis_conn = RedisManager.get_redis_connection()
while self._should_run():
message = redis_conn.pop(queue_name)
if message is not None:
message_json = json.loads(message)
if not self._is_valid_message(message_json):
raise InvalidMsgException(message)
cmd = message_json['cmd']
args = message_json['args']
try:
worker = self._get_worker_process(cmd)
if worker is None:
raise InvalidCmdException(cmd)
# procesa y envia datos a una queue 'result'
worker(cmd, args, message_json)
except Exception as e:
#error handling
else:
self.logger.info("nothing to do...sleeping")
time.sleep(sleep_time)
"""
Django-RQ
https://github.com/ui/django-rq
"""
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'DB': 0,
'DEFAULT_TIMEOUT': 360,
},
'low': {
'HOST': 'localhost',
'DB': 0,
'DEFAULT_TIMEOUT': 360,
},
'high': {
'URL': '10.10.1.100',
'DB': 0,
'DEFAULT_TIMEOUT': 1000,
},
}
from django_rq import job
@job('emails')
def send_registration_email(user_id):
email = create_registration_email(user_id)
send_email(email)
@job('low')
def update_tweets_by_user_id(user_id, limit=10):
try:
tm = TwitterManager()
tm.update_tweets(user_id, limit=limit)
except Exception as e:
#error handling
# Corre en el thread actual (bloqueante)
send_registration_email(emails, subject, text)
# Corre en algun worker procesando la queue correspondiente
send_registration_email.delay(emails, subject, text)
# Corre en algun worker procesando la queue 'high'
queue = django_rq.get_queue('high')
queue.enqueue(do_something_important, arg1, arg2, arg3)
"""
Celery setup Application
"""
from celery import Celery
app = Celery(‘current_module’)
app.config_from_object('django.conf:settings')
django_apps = lambda:settings.INSTALLED_APPS
app.autodiscover_tasks(django_apps)
"""
Celery task example 1
"""
from celery import shared_task
@shared_task(ignore_result=True, time_limit=10 * 60)
def update_tweets_by_user_id(user_id, limit=10):
try:
tm = TwitterManager()
tm.update_tweets(user_id, limit=limit)
except Exception as e:
#error handling
"""
Celery task example 2
"""
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
kwargs = {
'ignore_result': True,
'time_limit': 30 * 60,
'soft_time_limit': 3 * 60,
'default_retry_delay': 60,
'send_error_emails': True
}
@shared_task(**kwargs)
def update_tweets_by_user_id(user_id, limit=10):
try:
tm = TwitterManager()
tm.update_tweets(user_id, limit=limit)
except SoftTimeLimitExceeded:
cleanup_action()
except TwitterAuth as e:
raise update_tweets_by_user_id.retry(exc=e)
except Exception as e:
raise update_tweets_by_user_id.retry(exc=e, countdown=120)
"""
Celery AsyncResult
"""
async_result = add.delay(1, 4)
async_result.task_id
'7761a3f7-ae3a-48b0-9a03-8e9a7991ca8c'
async_result.status #estado
async_result.ready() #ha sido ejecutada?
async_result.failed() #fallo?
async_result.successful() #ok?
async_result.result #resultado
"""
Celery AsyncResult desde un task_id
"""
from celery.result import AsyncResult
task_id = '7761a3f7-ae3a-48b0-9a03-8e9a7991ca8c'
async_result = AsyncResult(task_id)
async_result.status #estado
async_result.ready() #ha sido ejecutada?
async_result.failed() #fallo?
async_result.successful() #ok?
async_result.result #resultado
"""
CeleryBeat settings
"""
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every day at 00:01 A.M
'conclude-request': {
'task': 'orders.tasks.conclude_requests',
'schedule': crontab(hour=0, minute=1),
},
# Executes every hour
'clean-database': {
'task': 'management.tasks.clean_database',
'schedule': timedelta(hours=1),
},
}
"""
Celery Canvas
"""
@shared_task
def get_facebook_data(user_id):
data = {
'user_id': user_id,
'name': 'Martin Alderete',
'src': 'facebook'
}
return data
@shared_task
def get_twitter_data(user_id):
data = {
'user_id': user_id,
'username': 'alderetemartin',
'src': 'twitter'
}
return data
@shared_task
def save_facebook_and_twitter_data(results):
data = {}
for d in results:
data.update(d)
return data
import celery
# Creamos 3 parciales
task1 = get_facebook_data.s(user_id)
task2 = get_twitter_data.s(user_id)
callback = save_facebook_and_twitter_data.s()
group = celery.group(task1, task2)
r = group()
r.successful()
r.redy()
r.get()
chord = celery.chord((task1, task2), callback)
r = chord()
r.successful()
r.redy()
r.get()
"""
Extra: Distributed locking
"""
import md5
from django.core.cache import cache
logger = get_task_logger(__name__)
@shared_task
def do_work_ensure_once(user_id, expire=60 * 5):
digest = md5.md5(user_id).hexdigest()
lock_id = 'user-lock-{1}'.format(digest)
acquire_lock = lambda: cache.add(lock_id, '1', expire)
release_dock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
do_work(user_id)
except Exception as:
# error handling
finally:
release_lock()
logger.info('User %s ya esta siendo procesado', user_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment