Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Last active May 21, 2025 02:55
Show Gist options
  • Save mzhang77/e2a3504f271fc3f8ae57656b47cbe9c6 to your computer and use it in GitHub Desktop.
Save mzhang77/e2a3504f271fc3f8ae57656b47cbe9c6 to your computer and use it in GitHub Desktop.
'''
#!/bin/bash
for i in {1..19}
do
echo "Starting sbtest$i..."
nohup python3 gc_test.py sbtest$i > "sbtest$i.out" 2>&1 &
sleep 5
done
'''
import threading
import mysql.connector
from mysql.connector import Error
import random
import string
import time
from datetime import datetime, timedelta
from collections import Counter
import logging
from logging.handlers import RotatingFileHandler
import sys
import argparse
parser = argparse.ArgumentParser(description='Run operations on a specified table.')
parser.add_argument('table_name', type=str, help='The name of the table to operate on')
args = parser.parse_args()
table_name = args.table_name
# Configure logging
def setup_logger(table_name, log_to_file=False, log_file='app.log'):
# Create a logger
logger = logging.getLogger('MyLogger')
logger.setLevel(logging.DEBUG)
# Clear existing handlers (if switching dynamically)
if logger.handlers:
logger.handlers.clear()
# Define formatter
formatter = logging.Formatter(f'%(asctime)s - %(levelname)s - [Table: {table_name}] - %(message)s')
if log_to_file:
# Log to a file
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
else:
# Log to stdout
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger
def create_table_if_not_exists(table_name):
global logger
conn = None
cursor = None
try:
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor()
create_query = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
k INT NOT NULL DEFAULT 0,
c CHAR(120) NOT NULL DEFAULT '',
pad CHAR(60) NOT NULL DEFAULT '',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY(created_at)
)
'''
cursor.execute(create_query)
conn.commit()
logger.info(f"Table `{table_name}` is ready.")
except Exception as e:
logger.info(f"Exception during table creation: {e}")
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
logger = setup_logger(table_name, log_to_file=True) # Logs to file 'app.log'
logger.info("Logging to a file.")
#logger = setup_logger(table_name, log_to_file=False) # Switches to stdout
#logger.info("Logging to stdout.")
global stats_counter
stats_counter = Counter()
dbconfig = {
"database": "test",
"user": "root",
"password": "",
"host": "127.0.0.1", # change
"port": "4000"
}
# Function to generate random strings
def random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# Function to insert a row similar to Sysbench's test table
def insert_rows_with_throttle(rows_per_second, total_rows):
global logger
conn = None
cursor = None
inserted_rows = 0
interval = 1.0 / rows_per_second
while inserted_rows < total_rows:
try:
if conn is None or not conn.is_connected():
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor()
k = random.randint(1, 100000)
c = random_string(120)
pad = random_string(60)
query = f"INSERT INTO {table_name} (k, c, pad, created_at) VALUES (%s, %s, %s, NOW())"
cursor.execute(query, (k, c, pad))
conn.commit()
inserted_rows += 1
time.sleep(interval)
except Exception as e:
logger.info(f"Exception: {e}, retrying...")
time.sleep(2)
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
logger.info(f"Inserted {inserted_rows} rows.")
def insert_rows_with_throttle_backdated(rows_per_second, total_rows):
global logger
conn = None
cursor = None
inserted_rows = 0
interval = 1.0 / rows_per_second
while inserted_rows < total_rows:
try:
if conn is None or not conn.is_connected():
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor()
k = random.randint(1, 100000)
c = random_string(120)
pad = random_string(60)
query = f"INSERT INTO {table_name} (k, c, pad, created_at) VALUES (%s, %s, %s, NOW() - INTERVAL 2 DAY)"
cursor.execute(query, (k, c, pad))
conn.commit()
inserted_rows += 1
time.sleep(interval)
except Exception as e:
logger.info(f"Exception: {e}, retrying...")
time.sleep(2)
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
logger.info(f"Inserted {inserted_rows} backdated rows.")
def select_and_delete(stop_event, runtime_seconds):
global logger
global stats_counter
start_time = time.time()
while not stop_event.is_set() and (time.time() - start_time) < runtime_seconds:
conn = None
cursor = None
time.sleep(5)
try:
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor(dictionary=True)
select_query = f"SELECT * FROM {table_name} WHERE created_at < DATE_SUB(DATE_SUB(NOW(), INTERVAL 2 DAY), INTERVAL 30 MINUTE) ORDER BY created_at LIMIT 2500"
t0 = time.time()
cursor.execute(select_query)
rows = cursor.fetchall()
t1 = time.time()
stats_counter['SELECT total executions'] += 1
if t1 - t0 > 0.3:
stats_counter['SELECT slow count'] += 1
logger.info(f"SELECT Execution Time: {t1 - t0:.6f} seconds. {stats_counter['SELECT slow count']} of {stats_counter['SELECT total executions']}")
if not rows:
logger.info("No rows found to delete.")
continue
ids_to_delete = [row['id'] for row in rows]
delete_query = "DELETE FROM {0} WHERE id IN (%s);".format(table_name) % (
', '.join(['%s'] * len(ids_to_delete))
)
cursor.execute(delete_query, ids_to_delete)
conn.commit()
logger.info(f"Deleted {cursor.rowcount} rows.")
except Exception as e:
logger.info(f"Exception in select_and_delete: {e}, retrying...")
time.sleep(2)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
def update_random_rows(duration_seconds):
global logger
end_time = time.time() + duration_seconds
conn = None
cursor = None
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor()
cursor.execute(f"SELECT id FROM {table_name} ORDER BY RAND() LIMIT 100")
ids = [row[0] for row in cursor.fetchall()]
cursor.close()
conn.close()
if not ids:
logger.info("No rows found to update.")
return
while time.time() < end_time:
try:
conn = mysql.connector.connect(**dbconfig)
cursor = conn.cursor()
update_query = f"UPDATE {table_name} SET created_at = created_at + interval 1 second WHERE id IN ({','.join(map(str, ids))})"
cursor.execute(update_query)
conn.commit()
logger.info(f"Attempted to update {len(ids)} rows. Actually updated {cursor.rowcount} rows.")
#time.sleep(0.1)
except Exception as e:
logger.info(f"Exception in update_random_rows: {e}")
time.sleep(2)
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
# Define a thread-safe stoppable thread class
class TimedThread(threading.Thread):
global logger
def __init__(self, runtime_days, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.runtime_seconds = runtime_days * 24 * 60 * 60 # Convert days to seconds
self._stop_event = threading.Event()
self.name = name
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
logger.info(f"{self.name} has started")
start_time = time.time()
while not self.stopped() and (time.time() - start_time) < self.runtime_seconds:
try:
if self.name == "Thread-1":
insert_rows_with_throttle(400, 1000)
elif self.name == "Thread-2":
select_and_delete(self._stop_event, self.runtime_seconds)
elif self.name == "Thread-3":
insert_rows_with_throttle_backdated(400, 1000)
elif self.name == "Thread-4":
time.sleep(90) # Wait for Thread-1 to insert some data
update_random_rows(self.runtime_seconds - (time.time() - start_time))
except Exception as e:
logger.info(f"Exception in {self.name}: {e}, retrying...")
time.sleep(2)
logger.info(f"{self.name} has stopped.")
create_table_if_not_exists(table_name)
threads = []
# Create and start the threads
thread1 = TimedThread(runtime_days=7, name="Thread-1")
threads.append(thread1)
thread2 = TimedThread(runtime_days=7, name="Thread-2")
threads.append(thread2)
thread3 = TimedThread(runtime_days=2, name="Thread-3")
threads.append(thread3)
thread4 = TimedThread(runtime_days=7, name="Thread-4")
threads.append(thread4)
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment