Last active
May 21, 2025 02:55
-
-
Save mzhang77/e2a3504f271fc3f8ae57656b47cbe9c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
#!/bin/bash | |
for i in {1..19} | |
do | |
echo "Starting sbtest$i..." | |
nohup python3 gc_test.py sbtest$i > "sbtest$i.out" 2>&1 & | |
sleep 5 | |
done | |
''' | |
import threading | |
import mysql.connector | |
from mysql.connector import Error | |
import random | |
import string | |
import time | |
from datetime import datetime, timedelta | |
from collections import Counter | |
import logging | |
from logging.handlers import RotatingFileHandler | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser(description='Run operations on a specified table.') | |
parser.add_argument('table_name', type=str, help='The name of the table to operate on') | |
args = parser.parse_args() | |
table_name = args.table_name | |
# Configure logging | |
def setup_logger(table_name, log_to_file=False, log_file='app.log'): | |
# Create a logger | |
logger = logging.getLogger('MyLogger') | |
logger.setLevel(logging.DEBUG) | |
# Clear existing handlers (if switching dynamically) | |
if logger.handlers: | |
logger.handlers.clear() | |
# Define formatter | |
formatter = logging.Formatter(f'%(asctime)s - %(levelname)s - [Table: {table_name}] - %(message)s') | |
if log_to_file: | |
# Log to a file | |
file_handler = logging.FileHandler(log_file) | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
else: | |
# Log to stdout | |
stream_handler = logging.StreamHandler(sys.stdout) | |
stream_handler.setFormatter(formatter) | |
logger.addHandler(stream_handler) | |
return logger | |
def create_table_if_not_exists(table_name): | |
global logger | |
conn = None | |
cursor = None | |
try: | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor() | |
create_query = f''' | |
CREATE TABLE IF NOT EXISTS {table_name} ( | |
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, | |
k INT NOT NULL DEFAULT 0, | |
c CHAR(120) NOT NULL DEFAULT '', | |
pad CHAR(60) NOT NULL DEFAULT '', | |
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
KEY(created_at) | |
) | |
''' | |
cursor.execute(create_query) | |
conn.commit() | |
logger.info(f"Table `{table_name}` is ready.") | |
except Exception as e: | |
logger.info(f"Exception during table creation: {e}") | |
finally: | |
if cursor is not None: | |
cursor.close() | |
if conn is not None: | |
conn.close() | |
logger = setup_logger(table_name, log_to_file=True) # Logs to file 'app.log' | |
logger.info("Logging to a file.") | |
#logger = setup_logger(table_name, log_to_file=False) # Switches to stdout | |
#logger.info("Logging to stdout.") | |
global stats_counter | |
stats_counter = Counter() | |
dbconfig = { | |
"database": "test", | |
"user": "root", | |
"password": "", | |
"host": "127.0.0.1", # change | |
"port": "4000" | |
} | |
# Function to generate random strings | |
def random_string(length): | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
# Function to insert a row similar to Sysbench's test table | |
def insert_rows_with_throttle(rows_per_second, total_rows): | |
global logger | |
conn = None | |
cursor = None | |
inserted_rows = 0 | |
interval = 1.0 / rows_per_second | |
while inserted_rows < total_rows: | |
try: | |
if conn is None or not conn.is_connected(): | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor() | |
k = random.randint(1, 100000) | |
c = random_string(120) | |
pad = random_string(60) | |
query = f"INSERT INTO {table_name} (k, c, pad, created_at) VALUES (%s, %s, %s, NOW())" | |
cursor.execute(query, (k, c, pad)) | |
conn.commit() | |
inserted_rows += 1 | |
time.sleep(interval) | |
except Exception as e: | |
logger.info(f"Exception: {e}, retrying...") | |
time.sleep(2) | |
if cursor is not None: | |
cursor.close() | |
if conn is not None: | |
conn.close() | |
logger.info(f"Inserted {inserted_rows} rows.") | |
def insert_rows_with_throttle_backdated(rows_per_second, total_rows): | |
global logger | |
conn = None | |
cursor = None | |
inserted_rows = 0 | |
interval = 1.0 / rows_per_second | |
while inserted_rows < total_rows: | |
try: | |
if conn is None or not conn.is_connected(): | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor() | |
k = random.randint(1, 100000) | |
c = random_string(120) | |
pad = random_string(60) | |
query = f"INSERT INTO {table_name} (k, c, pad, created_at) VALUES (%s, %s, %s, NOW() - INTERVAL 2 DAY)" | |
cursor.execute(query, (k, c, pad)) | |
conn.commit() | |
inserted_rows += 1 | |
time.sleep(interval) | |
except Exception as e: | |
logger.info(f"Exception: {e}, retrying...") | |
time.sleep(2) | |
if cursor is not None: | |
cursor.close() | |
if conn is not None: | |
conn.close() | |
logger.info(f"Inserted {inserted_rows} backdated rows.") | |
def select_and_delete(stop_event, runtime_seconds): | |
global logger | |
global stats_counter | |
start_time = time.time() | |
while not stop_event.is_set() and (time.time() - start_time) < runtime_seconds: | |
conn = None | |
cursor = None | |
time.sleep(5) | |
try: | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor(dictionary=True) | |
select_query = f"SELECT * FROM {table_name} WHERE created_at < DATE_SUB(DATE_SUB(NOW(), INTERVAL 2 DAY), INTERVAL 30 MINUTE) ORDER BY created_at LIMIT 2500" | |
t0 = time.time() | |
cursor.execute(select_query) | |
rows = cursor.fetchall() | |
t1 = time.time() | |
stats_counter['SELECT total executions'] += 1 | |
if t1 - t0 > 0.3: | |
stats_counter['SELECT slow count'] += 1 | |
logger.info(f"SELECT Execution Time: {t1 - t0:.6f} seconds. {stats_counter['SELECT slow count']} of {stats_counter['SELECT total executions']}") | |
if not rows: | |
logger.info("No rows found to delete.") | |
continue | |
ids_to_delete = [row['id'] for row in rows] | |
delete_query = "DELETE FROM {0} WHERE id IN (%s);".format(table_name) % ( | |
', '.join(['%s'] * len(ids_to_delete)) | |
) | |
cursor.execute(delete_query, ids_to_delete) | |
conn.commit() | |
logger.info(f"Deleted {cursor.rowcount} rows.") | |
except Exception as e: | |
logger.info(f"Exception in select_and_delete: {e}, retrying...") | |
time.sleep(2) | |
finally: | |
if cursor is not None: | |
cursor.close() | |
if conn is not None: | |
conn.close() | |
def update_random_rows(duration_seconds): | |
global logger | |
end_time = time.time() + duration_seconds | |
conn = None | |
cursor = None | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor() | |
cursor.execute(f"SELECT id FROM {table_name} ORDER BY RAND() LIMIT 100") | |
ids = [row[0] for row in cursor.fetchall()] | |
cursor.close() | |
conn.close() | |
if not ids: | |
logger.info("No rows found to update.") | |
return | |
while time.time() < end_time: | |
try: | |
conn = mysql.connector.connect(**dbconfig) | |
cursor = conn.cursor() | |
update_query = f"UPDATE {table_name} SET created_at = created_at + interval 1 second WHERE id IN ({','.join(map(str, ids))})" | |
cursor.execute(update_query) | |
conn.commit() | |
logger.info(f"Attempted to update {len(ids)} rows. Actually updated {cursor.rowcount} rows.") | |
#time.sleep(0.1) | |
except Exception as e: | |
logger.info(f"Exception in update_random_rows: {e}") | |
time.sleep(2) | |
finally: | |
if cursor is not None: | |
cursor.close() | |
if conn is not None: | |
conn.close() | |
# Define a thread-safe stoppable thread class | |
class TimedThread(threading.Thread): | |
global logger | |
def __init__(self, runtime_days, name, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.runtime_seconds = runtime_days * 24 * 60 * 60 # Convert days to seconds | |
self._stop_event = threading.Event() | |
self.name = name | |
def stop(self): | |
self._stop_event.set() | |
def stopped(self): | |
return self._stop_event.is_set() | |
def run(self): | |
logger.info(f"{self.name} has started") | |
start_time = time.time() | |
while not self.stopped() and (time.time() - start_time) < self.runtime_seconds: | |
try: | |
if self.name == "Thread-1": | |
insert_rows_with_throttle(400, 1000) | |
elif self.name == "Thread-2": | |
select_and_delete(self._stop_event, self.runtime_seconds) | |
elif self.name == "Thread-3": | |
insert_rows_with_throttle_backdated(400, 1000) | |
elif self.name == "Thread-4": | |
time.sleep(90) # Wait for Thread-1 to insert some data | |
update_random_rows(self.runtime_seconds - (time.time() - start_time)) | |
except Exception as e: | |
logger.info(f"Exception in {self.name}: {e}, retrying...") | |
time.sleep(2) | |
logger.info(f"{self.name} has stopped.") | |
create_table_if_not_exists(table_name) | |
threads = [] | |
# Create and start the threads | |
thread1 = TimedThread(runtime_days=7, name="Thread-1") | |
threads.append(thread1) | |
thread2 = TimedThread(runtime_days=7, name="Thread-2") | |
threads.append(thread2) | |
thread3 = TimedThread(runtime_days=2, name="Thread-3") | |
threads.append(thread3) | |
thread4 = TimedThread(runtime_days=7, name="Thread-4") | |
threads.append(thread4) | |
# Start all threads | |
for thread in threads: | |
thread.start() | |
# Wait for all threads to finish | |
for thread in threads: | |
thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment