Skip to content

Instantly share code, notes, and snippets.

@sphinxid
Created November 29, 2024 17:14
Show Gist options
  • Save sphinxid/71e45f79ed264119095fafde18a75d22 to your computer and use it in GitHub Desktop.
Save sphinxid/71e45f79ed264119095fafde18a75d22 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import mysql.connector
import time
import random
import signal
import sys
from datetime import datetime
from typing import Optional
class MySQLTester:
def __init__(self, host: str, user: str, password: str, port: int = 3306,
database: str = None, unix_socket: Optional[str] = None):
self.host = host
self.user = user
self.password = password
self.port = port
self.database = database
self.unix_socket = unix_socket
self.conn = None
self.cursor = None
def connect(self):
"""Establish MySQL connection"""
try:
connect_args = {
'user': self.user,
'password': self.password,
'connect_timeout': 10,
'connection_timeout': 10
}
if self.unix_socket:
connect_args['unix_socket'] = self.unix_socket
else:
connect_args['host'] = self.host
connect_args['port'] = self.port
# First connect without database
self.conn = mysql.connector.connect(**connect_args)
self.cursor = self.conn.cursor()
# Create and switch to database
if self.database:
self.create_database()
return True
except mysql.connector.Error as err:
if err.errno == 2003: # Connection timeout
print(f"Error connecting to MySQL: Connection timed out")
else:
print(f"Error connecting to MySQL: {err}")
return False
def create_database(self):
"""Create test database if it doesn't exist"""
try:
self.cursor.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}")
self.cursor.execute(f"USE {self.database}")
return True
except mysql.connector.Error as err:
print(f"Error creating database: {err}")
return False
def create_table(self):
"""Create test table"""
try:
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
value VARCHAR(255),
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
return True
except mysql.connector.Error as err:
print(f"Error creating table: {err}")
return False
def insert_dummy_data(self, count: int):
"""Insert dummy data into the test table"""
try:
for i in range(count):
self.cursor.execute(
"INSERT INTO test_table (value) VALUES (%s)",
(f"test_value_{random.randint(1, 1000)}",)
)
self.conn.commit()
print(f"Inserted {count} dummy records")
return True
except mysql.connector.Error as err:
print(f"Error inserting data: {err}")
return False
def read_data(self):
"""Read data from the test table"""
try:
self.conn.commit()
self.cursor.execute("SELECT * FROM test_table ORDER BY id DESC LIMIT 5")
rows = self.cursor.fetchall()
print("\nLatest 5 records:")
for row in rows:
print(f"ID: {row[0]}, Value: {row[1]}, Timestamp: {row[2]}")
return True
except mysql.connector.Error as err:
print(f"Error reading data: {err}")
return False
def dummy_select(self):
"""Perform a dummy SELECT operation"""
try:
self.cursor.execute("SELECT 1, 2, 3")
result = self.cursor.fetchone()
print(f"Dummy select result: {result}")
return True
except mysql.connector.Error as err:
print(f"Error in dummy select: {err}")
return False
def get_statistics(self):
"""Get basic statistics about the table"""
try:
self.cursor.execute("""
SELECT
COUNT(*) as total_rows,
MIN(timestamp) as oldest_record,
MAX(timestamp) as newest_record
FROM test_table
""")
stats = self.cursor.fetchone()
print("\nTable Statistics:")
print(f"Total rows: {stats[0]}")
print(f"Oldest record: {stats[1]}")
print(f"Newest record: {stats[2]}")
return True
except mysql.connector.Error as err:
print(f"Error getting statistics: {err}")
return False
def close(self):
"""Close database connection"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def signal_handler(signum, frame):
print("\nCtrl+C received. Exiting...")
sys.exit(0)
def main():
parser = argparse.ArgumentParser(description='MySQL Tester')
parser.add_argument('--host', help='MySQL host', default='localhost')
parser.add_argument('--user', required=True, help='MySQL user')
parser.add_argument('--password', required=True, help='MySQL password')
parser.add_argument('--port', type=int, default=3306, help='MySQL port')
parser.add_argument('--socket', help='Unix socket path')
parser.add_argument('--database', default='mysql_test', help='Database name')
parser.add_argument('--action', required=True,
choices=['insert', 'read', 'dummy-select', 'stats'],
help='Action to perform')
parser.add_argument('--count', type=int, default=1,
help='Number of operations to perform (-1 for infinite)')
parser.add_argument('--delay', type=float, default=1.0,
help='Delay between operations in seconds')
args = parser.parse_args()
# Set up signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
# Initialize tester
tester = MySQLTester(
host=args.host,
user=args.user,
password=args.password,
port=args.port,
database=args.database,
unix_socket=args.socket
)
# Connect to MySQL
if not tester.connect():
sys.exit(1)
# Create table
if not tester.create_table():
tester.close()
sys.exit(1)
# Execute the requested action
try:
iteration = 0
while args.count == -1 or iteration < args.count:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n[{timestamp}] Iteration {iteration + 1}")
if args.action == 'insert':
success = tester.insert_dummy_data(1)
elif args.action == 'read':
success = tester.read_data()
elif args.action == 'dummy-select':
success = tester.dummy_select()
elif args.action == 'stats':
success = tester.get_statistics()
if not success:
print("Operation failed, exiting...")
break
iteration += 1
if args.count != -1 and iteration >= args.count:
break
time.sleep(args.delay)
finally:
tester.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment