Last active
June 27, 2019 09:07
-
-
Save malderete/40ffa8b8f16431f55ca8 to your computer and use it in GitHub Desktop.
PyConAr 2014
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Presentacion de la problematica | |
envio bloqueante de emails en Django | |
""" | |
def register_user(request): | |
if request.method == 'POST': | |
form = RegisterUserForm(request.POST) | |
if form.is_valid(): | |
user = form.save() | |
# send the email!!! | |
send_registration_email(user) | |
return redirect('main_page') | |
else: | |
form = RegisterUserForm() | |
ctx = {'form': form} | |
return render(request, 'auth/registration.html', ctx) | |
""" | |
Hardcore way! | |
App ---> Queue ---> Custom Process (Worker) | |
Producer!! | |
""" | |
def send_registration_email(user): | |
redis_conn = RedisManager.get_redis_connection() | |
message = { | |
'cmd': 'send', | |
'args': { | |
'user_id': user.pk, | |
'to': [user.email], | |
'subject': 'Gracias por registrarse', | |
'text': 'Super mensaje' | |
} | |
} | |
json_message = json.dumps(message) | |
redis_conn.lpush('emails', json_message) | |
""" | |
Hardcore way! | |
App ---> Queue ---> Custom Process | |
Producer (Worker)!! | |
""" | |
def process(self, queue_name, sleep_time=60): | |
redis_conn = RedisManager.get_redis_connection() | |
while self._should_run(): | |
message = redis_conn.pop(queue_name) | |
if message is not None: | |
message_json = json.loads(message) | |
if not self._is_valid_message(message_json): | |
raise InvalidMsgException(message) | |
cmd = message_json['cmd'] | |
args = message_json['args'] | |
try: | |
worker = self._get_worker_process(cmd) | |
if worker is None: | |
raise InvalidCmdException(cmd) | |
# procesa y envia datos a una queue 'result' | |
worker(cmd, args, message_json) | |
except Exception as e: | |
#error handling | |
else: | |
self.logger.info("nothing to do...sleeping") | |
time.sleep(sleep_time) | |
""" | |
Django-RQ | |
https://github.com/ui/django-rq | |
""" | |
RQ_QUEUES = { | |
'default': { | |
'HOST': 'localhost', | |
'DB': 0, | |
'DEFAULT_TIMEOUT': 360, | |
}, | |
'low': { | |
'HOST': 'localhost', | |
'DB': 0, | |
'DEFAULT_TIMEOUT': 360, | |
}, | |
'high': { | |
'URL': '10.10.1.100', | |
'DB': 0, | |
'DEFAULT_TIMEOUT': 1000, | |
}, | |
} | |
from django_rq import job | |
@job('emails') | |
def send_registration_email(user_id): | |
email = create_registration_email(user_id) | |
send_email(email) | |
@job('low') | |
def update_tweets_by_user_id(user_id, limit=10): | |
try: | |
tm = TwitterManager() | |
tm.update_tweets(user_id, limit=limit) | |
except Exception as e: | |
#error handling | |
# Corre en el thread actual (bloqueante) | |
send_registration_email(emails, subject, text) | |
# Corre en algun worker procesando la queue correspondiente | |
send_registration_email.delay(emails, subject, text) | |
# Corre en algun worker procesando la queue 'high' | |
queue = django_rq.get_queue('high') | |
queue.enqueue(do_something_important, arg1, arg2, arg3) | |
""" | |
Celery setup Application | |
""" | |
from celery import Celery | |
app = Celery(‘current_module’) | |
app.config_from_object('django.conf:settings') | |
django_apps = lambda:settings.INSTALLED_APPS | |
app.autodiscover_tasks(django_apps) | |
""" | |
Celery task example 1 | |
""" | |
from celery import shared_task | |
@shared_task(ignore_result=True, time_limit=10 * 60) | |
def update_tweets_by_user_id(user_id, limit=10): | |
try: | |
tm = TwitterManager() | |
tm.update_tweets(user_id, limit=limit) | |
except Exception as e: | |
#error handling | |
""" | |
Celery task example 2 | |
""" | |
from celery import shared_task | |
from celery.exceptions import SoftTimeLimitExceeded | |
kwargs = { | |
'ignore_result': True, | |
'time_limit': 30 * 60, | |
'soft_time_limit': 3 * 60, | |
'default_retry_delay': 60, | |
'send_error_emails': True | |
} | |
@shared_task(**kwargs) | |
def update_tweets_by_user_id(user_id, limit=10): | |
try: | |
tm = TwitterManager() | |
tm.update_tweets(user_id, limit=limit) | |
except SoftTimeLimitExceeded: | |
cleanup_action() | |
except TwitterAuth as e: | |
raise update_tweets_by_user_id.retry(exc=e) | |
except Exception as e: | |
raise update_tweets_by_user_id.retry(exc=e, countdown=120) | |
""" | |
Celery AsyncResult | |
""" | |
async_result = add.delay(1, 4) | |
async_result.task_id | |
'7761a3f7-ae3a-48b0-9a03-8e9a7991ca8c' | |
async_result.status #estado | |
async_result.ready() #ha sido ejecutada? | |
async_result.failed() #fallo? | |
async_result.successful() #ok? | |
async_result.result #resultado | |
""" | |
Celery AsyncResult desde un task_id | |
""" | |
from celery.result import AsyncResult | |
task_id = '7761a3f7-ae3a-48b0-9a03-8e9a7991ca8c' | |
async_result = AsyncResult(task_id) | |
async_result.status #estado | |
async_result.ready() #ha sido ejecutada? | |
async_result.failed() #fallo? | |
async_result.successful() #ok? | |
async_result.result #resultado | |
""" | |
CeleryBeat settings | |
""" | |
from datetime import timedelta | |
from celery.schedules import crontab | |
CELERYBEAT_SCHEDULE = { | |
# Executes every day at 00:01 A.M | |
'conclude-request': { | |
'task': 'orders.tasks.conclude_requests', | |
'schedule': crontab(hour=0, minute=1), | |
}, | |
# Executes every hour | |
'clean-database': { | |
'task': 'management.tasks.clean_database', | |
'schedule': timedelta(hours=1), | |
}, | |
} | |
""" | |
Celery Canvas | |
""" | |
@shared_task | |
def get_facebook_data(user_id): | |
data = { | |
'user_id': user_id, | |
'name': 'Martin Alderete', | |
'src': 'facebook' | |
} | |
return data | |
@shared_task | |
def get_twitter_data(user_id): | |
data = { | |
'user_id': user_id, | |
'username': 'alderetemartin', | |
'src': 'twitter' | |
} | |
return data | |
@shared_task | |
def save_facebook_and_twitter_data(results): | |
data = {} | |
for d in results: | |
data.update(d) | |
return data | |
import celery | |
# Creamos 3 parciales | |
task1 = get_facebook_data.s(user_id) | |
task2 = get_twitter_data.s(user_id) | |
callback = save_facebook_and_twitter_data.s() | |
group = celery.group(task1, task2) | |
r = group() | |
r.successful() | |
r.redy() | |
r.get() | |
chord = celery.chord((task1, task2), callback) | |
r = chord() | |
r.successful() | |
r.redy() | |
r.get() | |
""" | |
Extra: Distributed locking | |
""" | |
import md5 | |
from django.core.cache import cache | |
logger = get_task_logger(__name__) | |
@shared_task | |
def do_work_ensure_once(user_id, expire=60 * 5): | |
digest = md5.md5(user_id).hexdigest() | |
lock_id = 'user-lock-{1}'.format(digest) | |
acquire_lock = lambda: cache.add(lock_id, '1', expire) | |
release_dock = lambda: cache.delete(lock_id) | |
if acquire_lock(): | |
try: | |
do_work(user_id) | |
except Exception as: | |
# error handling | |
finally: | |
release_lock() | |
logger.info('User %s ya esta siendo procesado', user_id) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment