Skip to content

Instantly share code, notes, and snippets.

@david-sosa-valdes
Created May 26, 2023 03:15
Show Gist options
  • Save david-sosa-valdes/cb94a0f399c60742aa6a5f4ec12a451d to your computer and use it in GitHub Desktop.
Save david-sosa-valdes/cb94a0f399c60742aa6a5f4ec12a451d to your computer and use it in GitHub Desktop.
RabbitMQ + Python + Django
import time
import pika
import json
import uuid
import json
import os
from django.db import models
from django.conf import settings
from django.contrib.auth import get_user_model
from django_extensions.db.models import TimeStampedModel
from django.contrib.contenttypes.fields import GenericRelation
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.forms.models import model_to_dict
from apps.sync.models import SyncContent
from apps.sync.signals import update_data_signal
User = get_user_model()
"""
Estructura de DB
"""
class Contact(TimeStampedModel):
id = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False
)
name = models.CharField(max_length=256)
last_name = models.CharField(max_length=256)
phone_number = models.CharField(max_length=10)
# Relación generica con modelo de sincronización de datos
sync = GenericRelation(SyncContent)
def __str__(self):
return self.name
# Indice Generico de Sincronizacion que permite consultas inversas
class SyncContent(TimeStampedModel):
# Un registro por usuario activo en el sistema
user = models.ForeignKey(User, on_delete=models.CASCADE)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.UUIDField()
content_object = GenericForeignKey("content_type", "object_id")
# Estado de sincronización
is_synced = models.BooleanField(default=False)
class Meta:
indexes = [
models.Index(fields=["content_type", "object_id"]),
]
def update_data_signal(queue_name, callback):
"""
Esta funcion espera la señal para conectarse con RabbitMQ
para que callback reciba los parámetros de la DB en cola, cuando
termine la cola al final cierra la conexion.
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.RABBITMQ_HOST))#Paramtro de la conexión
channel = connection.channel() #Abre la conexión
channel.queue_declare(queue=queue_name, durable=True)#Se declara el nombre de la cola
def basic_publish(data: dict):#Guarda parametro de la cola
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
callback(basic_publish)
connection.close()
# Señales de registros (DB)
"""
@receiver: decorador que se ejecuta cuando se genera un post_signal,
es decir cuando se crea nueva o se actualiza informacion en la base de datos.
"""
@receiver(post_save, sender=Contact, dispatch_uid="sync_by_data_content")
def update_data(sender, instance: Contact, **kwargs):
"""
Esta funcion es basicamente un wrapper que llama la funcion update_data_signal
con los parametros: nombre de la cola y callback ('RABBIT_CHANNEL', callback),
para comenzar la sicronizacion de contactos de cada usuario.
"""
def callback(basic_publish):
"""
Esta funcion guarda la nueva informacion de data para los usuarios
"""
for user in User.objects.all():
basic_publish({
'content_type': 'data',
'object_id': str(instance.id),
'user_id': str(user.id)
})
# print("sync_by_data_content", settings.RABBIT_CHANNEL, settings.DATABASES)
return update_data_signal(settings.RABBIT_CHANNEL, callback)
@receiver(post_save, sender=User, dispatch_uid="sync_by_user_content")
def update_user(sender, instance: User, created, **kwargs):
if created:
def callback(basic_publish):
for data in Contact.objects.all():
basic_publish({
'content_type': 'data',
'object_id': str(data.id),
'user_id': str(instance.id)
})
return update_data_signal(settings.RABBIT_CHANNEL, callback)
def worker_connection():
return pika.BlockingConnection(pika.ConnectionParameters(settings.RABBITMQ_HOST, heartbeat=600, blocked_connection_timeout=300))
def worker_handler(queue_name: str, callback: callable, auto_close: bool = False):
try:
connection = worker_connection()
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_consume(queue=queue_name, on_message_callback=callback(connection, auto_close))
print(f'[{queue_name}] [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except KeyboardInterrupt:
# Si se interupe el proseso salta el error
pass
def worker_callback(connection, auto_close: bool = False):
def callback(channel, method, properties, body):
attempts = 5
pivot = 0
# A veces tenemos que el proceso en cola es más rapido de ejecutarse que Django escribiendo
# en la base de datos, es por eso que ponemos un max de 5 intentos.
while pivot < attempts:
# Atrapa el error de que el usuario no a sido creado
try:
data = json.loads(body)
user = User.objects.get(pk=data["user_id"])
# TODO: hacer que esto sea dinamico, que se pueda cambiar el modelo en base a content_type
instance = Contact.objects.get(pk=data["object_id"])
sync_obj, created = instance.sync.get_or_create(user=user)
sync_obj.is_synced = False
sync_obj.save()
channel.basic_ack(delivery_tag=method.delivery_tag)
if auto_close:
channel.close()
return connection.close()
return True
except (Contact.DoesNotExist, User.DoesNotExist) as ex:
print(f"Intento ({pivot}), intentando nuevamente en 500ms debido a que el registro todavia no existe en DB")
finally:
# controlar conecciones simultaneas a la DB
time.sleep(0.5)
pivot += 1
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return callback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment