Created
December 18, 2024 18:43
-
-
Save gabanox/e46f0d12dd635c6525866aa30dd6703e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import redis | |
from redis.cluster import RedisCluster | |
import time | |
import random | |
from datetime import datetime | |
import uuid | |
from faker import Faker | |
# Configuración | |
REDIS_CONFIG_ENDPOINT = '' | |
REDIS_PORT = 6379 | |
fake = Faker() | |
def connect_to_cluster(): | |
"""Conecta al cluster de Redis""" | |
try: | |
cluster_client = RedisCluster( | |
host=REDIS_CONFIG_ENDPOINT, | |
port=REDIS_PORT, | |
decode_responses=True | |
) | |
print("Conexión al cluster establecida exitosamente") | |
return cluster_client | |
except Exception as e: | |
print(f"Error conectando al cluster: {e}") | |
raise | |
def clean_cluster(cluster_client): | |
"""Limpia todos los datos del cluster""" | |
try: | |
print("Iniciando limpieza del cluster...") | |
# Obtener todas las claves | |
keys = list(cluster_client.scan_iter("*")) | |
if keys: | |
cluster_client.delete(*keys) | |
print(f"Se eliminaron {len(keys)} claves") | |
else: | |
print("No se encontraron claves para eliminar") | |
except Exception as e: | |
print(f"Error durante la limpieza: {e}") | |
def verify_distribution(cluster_client): | |
"""Verifica la distribución de claves entre shards""" | |
slot_counts = {} | |
total_keys = 0 | |
print("\nVerificando distribución de claves:") | |
for key in cluster_client.scan_iter(): | |
slot = cluster_client.cluster_keyslot(key) | |
slot_counts[slot] = slot_counts.get(slot, 0) + 1 | |
total_keys += 1 | |
# Analizar distribución por shard | |
shard1_keys = sum(1 for slot in slot_counts if 0 <= slot <= 8191) | |
shard2_keys = sum(1 for slot in slot_counts if 8192 <= slot <= 16383) | |
print(f"Total de claves: {total_keys}") | |
print(f"Claves en shard 1 (slots 0-8191): {shard1_keys}") | |
print(f"Claves en shard 2 (slots 8192-16383): {shard2_keys}") | |
def generate_distributed_key(index): | |
"""Genera claves que se distribuirán uniformemente""" | |
# Usar diferentes patrones para forzar distribución | |
patterns = [ | |
f"user:{index}", | |
f"session:{index}", | |
f"product:{index}", | |
f"order:{index}", | |
f"{index}:metadata" | |
] | |
return random.choice(patterns) | |
def generate_session_data(): | |
"""Genera datos de sesión de ejemplo""" | |
return { | |
'user_id': f"{random.randint(1, 999):03d}", | |
'username': fake.user_name(), | |
'email': fake.email(), | |
'last_activity': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'data': fake.text(max_nb_chars=100) | |
} | |
def load_test_distributed(cluster_client, num_keys=1000): | |
"""Ejecuta prueba de carga con distribución uniforme""" | |
start_time = time.time() | |
shard_distribution = { | |
'shard1': 0, # 0-8191 | |
'shard2': 0 # 8192-16383 | |
} | |
print(f"\nIniciando carga de {num_keys} claves...") | |
for i in range(num_keys): | |
key = generate_distributed_key(i) | |
data = generate_session_data() | |
try: | |
# Verificar a qué shard irá la clave | |
slot = cluster_client.cluster_keyslot(key) | |
if 0 <= slot <= 8191: | |
shard_distribution['shard1'] += 1 | |
else: | |
shard_distribution['shard2'] += 1 | |
# Guardar datos | |
cluster_client.hset(key, mapping=data) | |
if (i + 1) % 100 == 0: | |
elapsed = time.time() - start_time | |
rate = (i + 1) / elapsed | |
print(f"Progreso: {i + 1}/{num_keys} - Rate: {rate:.2f} ops/sec") | |
except Exception as e: | |
print(f"Error guardando clave {key}: {e}") | |
elapsed = time.time() - start_time | |
print("\nCarga completada:") | |
print(f"Tiempo total: {elapsed:.2f} segundos") | |
print(f"Promedio: {num_keys/elapsed:.2f} ops/sec") | |
print("\nDistribución entre shards:") | |
print(f"Shard 1 (0-8191): {shard_distribution['shard1']} claves") | |
print(f"Shard 2 (8192-16383): {shard_distribution['shard2']} claves") | |
def main(): | |
try: | |
# Conectar al cluster | |
cluster_client = connect_to_cluster() | |
# Limpiar datos existentes | |
clean_cluster(cluster_client) | |
# Verificar estado inicial | |
verify_distribution(cluster_client) | |
# Ejecutar prueba de carga | |
load_test_distributed(cluster_client, num_keys=1000) | |
# Verificar distribución final | |
verify_distribution(cluster_client) | |
except Exception as e: | |
print(f"Error en la ejecución: {e}") | |
finally: | |
if 'cluster_client' in locals(): | |
cluster_client.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment