Last active
December 18, 2024 17:55
-
-
Save gabanox/6b77842eca7fc44c05c4d44cd99d9038 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import redis | |
import time | |
import random | |
from datetime import datetime, timedelta | |
import uuid | |
import threading | |
from faker import Faker | |
# Configurar Faker para generar datos falsos | |
fake = Faker() | |
# Configuración de Redis | |
REDIS_HOST = '' # Reemplaza con tu endpoint | |
REDIS_PORT = 6379 | |
def generate_session_data(): | |
user_id = f"{random.randint(1, 999):03d}" | |
username = fake.name().lower().replace(' ', '_') | |
return { | |
'user_id': user_id, | |
'username': username, | |
'email': f"{username}@example.com", | |
'last_activity': datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
} | |
def write_session_data(redis_client, num_sessions=1000): | |
"""Escribe datos de sesión en Redis""" | |
start_time = time.time() | |
successful_writes = 0 | |
for i in range(num_sessions): | |
session_id = f"sample_session:{str(uuid.uuid4())[:8]}" | |
session_data = generate_session_data() | |
try: | |
# Guardar datos con TTL de 1 hora | |
redis_client.hmset(session_id, session_data) | |
redis_client.expire(session_id, 3600) # TTL de 1 hora | |
successful_writes += 1 | |
if i % 100 == 0: | |
elapsed = time.time() - start_time | |
ops_per_sec = i / elapsed if elapsed > 0 else 0 | |
print(f"Progreso: {i}/{num_sessions} - {ops_per_sec:.2f} ops/sec") | |
except redis.RedisError as e: | |
print(f"Error escribiendo sesión {session_id}: {e}") | |
total_time = time.time() - start_time | |
print(f"\nEscritura completada:") | |
print(f"Total sesiones: {successful_writes}") | |
print(f"Tiempo total: {total_time:.2f} segundos") | |
print(f"Promedio ops/sec: {successful_writes/total_time:.2f}") | |
def read_session_data(redis_client, num_reads=1000): | |
"""Simula lecturas aleatorias de sesiones""" | |
start_time = time.time() | |
successful_reads = 0 | |
# Obtener todas las claves de sesión | |
session_keys = list(redis_client.scan_iter("sample_session:*")) | |
if not session_keys: | |
print("No hay sesiones para leer") | |
return | |
for i in range(num_reads): | |
session_key = random.choice(session_keys) | |
try: | |
redis_client.hgetall(session_key) | |
successful_reads += 1 | |
if i % 100 == 0: | |
elapsed = time.time() - start_time | |
ops_per_sec = i / elapsed if elapsed > 0 else 0 | |
print(f"Lecturas: {i}/{num_reads} - {ops_per_sec:.2f} ops/sec") | |
except redis.RedisError as e: | |
print(f"Error leyendo sesión {session_key}: {e}") | |
total_time = time.time() - start_time | |
print(f"\nLectura completada:") | |
print(f"Total lecturas: {successful_reads}") | |
print(f"Tiempo total: {total_time:.2f} segundos") | |
print(f"Promedio ops/sec: {successful_reads/total_time:.2f}") | |
def mixed_workload(redis_client, duration=300): | |
"""Simula una carga mixta de lecturas y escrituras durante un tiempo específico""" | |
print(f"Iniciando prueba de carga mixta por {duration} segundos") | |
end_time = time.time() + duration | |
ops_counter = {'writes': 0, 'reads': 0} | |
while time.time() < end_time: | |
# 70% lecturas, 30% escrituras | |
if random.random() < 0.7: | |
try: | |
session_keys = list(redis_client.scan_iter("sample_session:*")) | |
if session_keys: | |
redis_client.hgetall(random.choice(session_keys)) | |
ops_counter['reads'] += 1 | |
except redis.RedisError: | |
pass | |
else: | |
try: | |
session_id = f"sample_session:{str(uuid.uuid4())[:8]}" | |
session_data = generate_session_data() | |
redis_client.hmset(session_id, session_data) | |
redis_client.expire(session_id, 3600) | |
ops_counter['writes'] += 1 | |
except redis.RedisError: | |
pass | |
print("\nPrueba de carga mixta completada:") | |
print(f"Total escrituras: {ops_counter['writes']}") | |
print(f"Total lecturas: {ops_counter['reads']}") | |
print(f"Ops/sec escrituras: {ops_counter['writes']/duration:.2f}") | |
print(f"Ops/sec lecturas: {ops_counter['reads']/duration:.2f}") | |
def main(): | |
# Conectar a Redis | |
redis_client = redis.Redis( | |
host=REDIS_HOST, | |
port=REDIS_PORT, | |
decode_responses=True | |
) | |
try: | |
# Verificar conexión | |
redis_client.ping() | |
print("Conectado a Redis exitosamente") | |
# Ejecutar pruebas | |
print("\n1. Prueba de escritura masiva") | |
write_session_data(redis_client, num_sessions=1000) | |
print("\n2. Prueba de lectura") | |
read_session_data(redis_client, num_reads=2000) | |
print("\n3. Prueba de carga mixta") | |
mixed_workload(redis_client, duration=300) # 5 minutos | |
except redis.ConnectionError as e: | |
print(f"Error de conexión: {e}") | |
finally: | |
redis_client.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment