Skip to content

Instantly share code, notes, and snippets.

@gabanox
Last active December 18, 2024 17:55
Show Gist options
  • Save gabanox/6b77842eca7fc44c05c4d44cd99d9038 to your computer and use it in GitHub Desktop.
Save gabanox/6b77842eca7fc44c05c4d44cd99d9038 to your computer and use it in GitHub Desktop.
import redis
import time
import random
from datetime import datetime, timedelta
import uuid
import threading
from faker import Faker
# Configurar Faker para generar datos falsos
fake = Faker()
# Configuración de Redis
REDIS_HOST = '' # Reemplaza con tu endpoint
REDIS_PORT = 6379
def generate_session_data():
user_id = f"{random.randint(1, 999):03d}"
username = fake.name().lower().replace(' ', '_')
return {
'user_id': user_id,
'username': username,
'email': f"{username}@example.com",
'last_activity': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def write_session_data(redis_client, num_sessions=1000):
"""Escribe datos de sesión en Redis"""
start_time = time.time()
successful_writes = 0
for i in range(num_sessions):
session_id = f"sample_session:{str(uuid.uuid4())[:8]}"
session_data = generate_session_data()
try:
# Guardar datos con TTL de 1 hora
redis_client.hmset(session_id, session_data)
redis_client.expire(session_id, 3600) # TTL de 1 hora
successful_writes += 1
if i % 100 == 0:
elapsed = time.time() - start_time
ops_per_sec = i / elapsed if elapsed > 0 else 0
print(f"Progreso: {i}/{num_sessions} - {ops_per_sec:.2f} ops/sec")
except redis.RedisError as e:
print(f"Error escribiendo sesión {session_id}: {e}")
total_time = time.time() - start_time
print(f"\nEscritura completada:")
print(f"Total sesiones: {successful_writes}")
print(f"Tiempo total: {total_time:.2f} segundos")
print(f"Promedio ops/sec: {successful_writes/total_time:.2f}")
def read_session_data(redis_client, num_reads=1000):
"""Simula lecturas aleatorias de sesiones"""
start_time = time.time()
successful_reads = 0
# Obtener todas las claves de sesión
session_keys = list(redis_client.scan_iter("sample_session:*"))
if not session_keys:
print("No hay sesiones para leer")
return
for i in range(num_reads):
session_key = random.choice(session_keys)
try:
redis_client.hgetall(session_key)
successful_reads += 1
if i % 100 == 0:
elapsed = time.time() - start_time
ops_per_sec = i / elapsed if elapsed > 0 else 0
print(f"Lecturas: {i}/{num_reads} - {ops_per_sec:.2f} ops/sec")
except redis.RedisError as e:
print(f"Error leyendo sesión {session_key}: {e}")
total_time = time.time() - start_time
print(f"\nLectura completada:")
print(f"Total lecturas: {successful_reads}")
print(f"Tiempo total: {total_time:.2f} segundos")
print(f"Promedio ops/sec: {successful_reads/total_time:.2f}")
def mixed_workload(redis_client, duration=300):
"""Simula una carga mixta de lecturas y escrituras durante un tiempo específico"""
print(f"Iniciando prueba de carga mixta por {duration} segundos")
end_time = time.time() + duration
ops_counter = {'writes': 0, 'reads': 0}
while time.time() < end_time:
# 70% lecturas, 30% escrituras
if random.random() < 0.7:
try:
session_keys = list(redis_client.scan_iter("sample_session:*"))
if session_keys:
redis_client.hgetall(random.choice(session_keys))
ops_counter['reads'] += 1
except redis.RedisError:
pass
else:
try:
session_id = f"sample_session:{str(uuid.uuid4())[:8]}"
session_data = generate_session_data()
redis_client.hmset(session_id, session_data)
redis_client.expire(session_id, 3600)
ops_counter['writes'] += 1
except redis.RedisError:
pass
print("\nPrueba de carga mixta completada:")
print(f"Total escrituras: {ops_counter['writes']}")
print(f"Total lecturas: {ops_counter['reads']}")
print(f"Ops/sec escrituras: {ops_counter['writes']/duration:.2f}")
print(f"Ops/sec lecturas: {ops_counter['reads']/duration:.2f}")
def main():
# Conectar a Redis
redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True
)
try:
# Verificar conexión
redis_client.ping()
print("Conectado a Redis exitosamente")
# Ejecutar pruebas
print("\n1. Prueba de escritura masiva")
write_session_data(redis_client, num_sessions=1000)
print("\n2. Prueba de lectura")
read_session_data(redis_client, num_reads=2000)
print("\n3. Prueba de carga mixta")
mixed_workload(redis_client, duration=300) # 5 minutos
except redis.ConnectionError as e:
print(f"Error de conexión: {e}")
finally:
redis_client.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment