Created
May 26, 2023 03:15
-
-
Save david-sosa-valdes/cb94a0f399c60742aa6a5f4ec12a451d to your computer and use it in GitHub Desktop.
RabbitMQ + Python + Django
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pika | |
import json | |
import uuid | |
import json | |
import os | |
from django.db import models | |
from django.conf import settings | |
from django.contrib.auth import get_user_model | |
from django_extensions.db.models import TimeStampedModel | |
from django.contrib.contenttypes.fields import GenericRelation | |
from django.db.models.signals import post_save | |
from django.dispatch import receiver | |
from django.forms.models import model_to_dict | |
from apps.sync.models import SyncContent | |
from apps.sync.signals import update_data_signal | |
User = get_user_model() | |
""" | |
Estructura de DB | |
""" | |
class Contact(TimeStampedModel): | |
id = models.UUIDField( | |
primary_key=True, | |
default=uuid.uuid4, | |
editable=False | |
) | |
name = models.CharField(max_length=256) | |
last_name = models.CharField(max_length=256) | |
phone_number = models.CharField(max_length=10) | |
# Relación generica con modelo de sincronización de datos | |
sync = GenericRelation(SyncContent) | |
def __str__(self): | |
return self.name | |
# Indice Generico de Sincronizacion que permite consultas inversas | |
class SyncContent(TimeStampedModel): | |
# Un registro por usuario activo en el sistema | |
user = models.ForeignKey(User, on_delete=models.CASCADE) | |
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) | |
object_id = models.UUIDField() | |
content_object = GenericForeignKey("content_type", "object_id") | |
# Estado de sincronización | |
is_synced = models.BooleanField(default=False) | |
class Meta: | |
indexes = [ | |
models.Index(fields=["content_type", "object_id"]), | |
] | |
def update_data_signal(queue_name, callback): | |
""" | |
Esta funcion espera la señal para conectarse con RabbitMQ | |
para que callback reciba los parámetros de la DB en cola, cuando | |
termine la cola al final cierra la conexion. | |
""" | |
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RABBITMQ_HOST))#Paramtro de la conexión | |
channel = connection.channel() #Abre la conexión | |
channel.queue_declare(queue=queue_name, durable=True)#Se declara el nombre de la cola | |
def basic_publish(data: dict):#Guarda parametro de la cola | |
channel.basic_publish( | |
exchange='', | |
routing_key=queue_name, | |
body=json.dumps(data), | |
properties=pika.BasicProperties( | |
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE | |
) | |
) | |
callback(basic_publish) | |
connection.close() | |
# Señales de registros (DB) | |
""" | |
@receiver: decorador que se ejecuta cuando se genera un post_signal, | |
es decir cuando se crea nueva o se actualiza informacion en la base de datos. | |
""" | |
@receiver(post_save, sender=Contact, dispatch_uid="sync_by_data_content") | |
def update_data(sender, instance: Contact, **kwargs): | |
""" | |
Esta funcion es basicamente un wrapper que llama la funcion update_data_signal | |
con los parametros: nombre de la cola y callback ('RABBIT_CHANNEL', callback), | |
para comenzar la sicronizacion de contactos de cada usuario. | |
""" | |
def callback(basic_publish): | |
""" | |
Esta funcion guarda la nueva informacion de data para los usuarios | |
""" | |
for user in User.objects.all(): | |
basic_publish({ | |
'content_type': 'data', | |
'object_id': str(instance.id), | |
'user_id': str(user.id) | |
}) | |
# print("sync_by_data_content", settings.RABBIT_CHANNEL, settings.DATABASES) | |
return update_data_signal(settings.RABBIT_CHANNEL, callback) | |
@receiver(post_save, sender=User, dispatch_uid="sync_by_user_content") | |
def update_user(sender, instance: User, created, **kwargs): | |
if created: | |
def callback(basic_publish): | |
for data in Contact.objects.all(): | |
basic_publish({ | |
'content_type': 'data', | |
'object_id': str(data.id), | |
'user_id': str(instance.id) | |
}) | |
return update_data_signal(settings.RABBIT_CHANNEL, callback) | |
def worker_connection(): | |
return pika.BlockingConnection(pika.ConnectionParameters(settings.RABBITMQ_HOST, heartbeat=600, blocked_connection_timeout=300)) | |
def worker_handler(queue_name: str, callback: callable, auto_close: bool = False): | |
try: | |
connection = worker_connection() | |
channel = connection.channel() | |
channel.queue_declare(queue=queue_name, durable=True) | |
channel.basic_consume(queue=queue_name, on_message_callback=callback(connection, auto_close)) | |
print(f'[{queue_name}] [*] Waiting for messages. To exit press CTRL+C') | |
channel.start_consuming() | |
except KeyboardInterrupt: | |
# Si se interupe el proseso salta el error | |
pass | |
def worker_callback(connection, auto_close: bool = False): | |
def callback(channel, method, properties, body): | |
attempts = 5 | |
pivot = 0 | |
# A veces tenemos que el proceso en cola es más rapido de ejecutarse que Django escribiendo | |
# en la base de datos, es por eso que ponemos un max de 5 intentos. | |
while pivot < attempts: | |
# Atrapa el error de que el usuario no a sido creado | |
try: | |
data = json.loads(body) | |
user = User.objects.get(pk=data["user_id"]) | |
# TODO: hacer que esto sea dinamico, que se pueda cambiar el modelo en base a content_type | |
instance = Contact.objects.get(pk=data["object_id"]) | |
sync_obj, created = instance.sync.get_or_create(user=user) | |
sync_obj.is_synced = False | |
sync_obj.save() | |
channel.basic_ack(delivery_tag=method.delivery_tag) | |
if auto_close: | |
channel.close() | |
return connection.close() | |
return True | |
except (Contact.DoesNotExist, User.DoesNotExist) as ex: | |
print(f"Intento ({pivot}), intentando nuevamente en 500ms debido a que el registro todavia no existe en DB") | |
finally: | |
# controlar conecciones simultaneas a la DB | |
time.sleep(0.5) | |
pivot += 1 | |
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False) | |
return callback |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment