Created
November 29, 2024 17:14
-
-
Save sphinxid/71e45f79ed264119095fafde18a75d22 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import mysql.connector | |
import time | |
import random | |
import signal | |
import sys | |
from datetime import datetime | |
from typing import Optional | |
class MySQLTester: | |
def __init__(self, host: str, user: str, password: str, port: int = 3306, | |
database: str = None, unix_socket: Optional[str] = None): | |
self.host = host | |
self.user = user | |
self.password = password | |
self.port = port | |
self.database = database | |
self.unix_socket = unix_socket | |
self.conn = None | |
self.cursor = None | |
def connect(self): | |
"""Establish MySQL connection""" | |
try: | |
connect_args = { | |
'user': self.user, | |
'password': self.password, | |
'connect_timeout': 10, | |
'connection_timeout': 10 | |
} | |
if self.unix_socket: | |
connect_args['unix_socket'] = self.unix_socket | |
else: | |
connect_args['host'] = self.host | |
connect_args['port'] = self.port | |
# First connect without database | |
self.conn = mysql.connector.connect(**connect_args) | |
self.cursor = self.conn.cursor() | |
# Create and switch to database | |
if self.database: | |
self.create_database() | |
return True | |
except mysql.connector.Error as err: | |
if err.errno == 2003: # Connection timeout | |
print(f"Error connecting to MySQL: Connection timed out") | |
else: | |
print(f"Error connecting to MySQL: {err}") | |
return False | |
def create_database(self): | |
"""Create test database if it doesn't exist""" | |
try: | |
self.cursor.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}") | |
self.cursor.execute(f"USE {self.database}") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error creating database: {err}") | |
return False | |
def create_table(self): | |
"""Create test table""" | |
try: | |
self.cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS test_table ( | |
id INT AUTO_INCREMENT PRIMARY KEY, | |
value VARCHAR(255), | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | |
) | |
""") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error creating table: {err}") | |
return False | |
def insert_dummy_data(self, count: int): | |
"""Insert dummy data into the test table""" | |
try: | |
for i in range(count): | |
self.cursor.execute( | |
"INSERT INTO test_table (value) VALUES (%s)", | |
(f"test_value_{random.randint(1, 1000)}",) | |
) | |
self.conn.commit() | |
print(f"Inserted {count} dummy records") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error inserting data: {err}") | |
return False | |
def read_data(self): | |
"""Read data from the test table""" | |
try: | |
self.conn.commit() | |
self.cursor.execute("SELECT * FROM test_table ORDER BY id DESC LIMIT 5") | |
rows = self.cursor.fetchall() | |
print("\nLatest 5 records:") | |
for row in rows: | |
print(f"ID: {row[0]}, Value: {row[1]}, Timestamp: {row[2]}") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error reading data: {err}") | |
return False | |
def dummy_select(self): | |
"""Perform a dummy SELECT operation""" | |
try: | |
self.cursor.execute("SELECT 1, 2, 3") | |
result = self.cursor.fetchone() | |
print(f"Dummy select result: {result}") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error in dummy select: {err}") | |
return False | |
def get_statistics(self): | |
"""Get basic statistics about the table""" | |
try: | |
self.cursor.execute(""" | |
SELECT | |
COUNT(*) as total_rows, | |
MIN(timestamp) as oldest_record, | |
MAX(timestamp) as newest_record | |
FROM test_table | |
""") | |
stats = self.cursor.fetchone() | |
print("\nTable Statistics:") | |
print(f"Total rows: {stats[0]}") | |
print(f"Oldest record: {stats[1]}") | |
print(f"Newest record: {stats[2]}") | |
return True | |
except mysql.connector.Error as err: | |
print(f"Error getting statistics: {err}") | |
return False | |
def close(self): | |
"""Close database connection""" | |
if self.cursor: | |
self.cursor.close() | |
if self.conn: | |
self.conn.close() | |
def signal_handler(signum, frame): | |
print("\nCtrl+C received. Exiting...") | |
sys.exit(0) | |
def main(): | |
parser = argparse.ArgumentParser(description='MySQL Tester') | |
parser.add_argument('--host', help='MySQL host', default='localhost') | |
parser.add_argument('--user', required=True, help='MySQL user') | |
parser.add_argument('--password', required=True, help='MySQL password') | |
parser.add_argument('--port', type=int, default=3306, help='MySQL port') | |
parser.add_argument('--socket', help='Unix socket path') | |
parser.add_argument('--database', default='mysql_test', help='Database name') | |
parser.add_argument('--action', required=True, | |
choices=['insert', 'read', 'dummy-select', 'stats'], | |
help='Action to perform') | |
parser.add_argument('--count', type=int, default=1, | |
help='Number of operations to perform (-1 for infinite)') | |
parser.add_argument('--delay', type=float, default=1.0, | |
help='Delay between operations in seconds') | |
args = parser.parse_args() | |
# Set up signal handler for Ctrl+C | |
signal.signal(signal.SIGINT, signal_handler) | |
# Initialize tester | |
tester = MySQLTester( | |
host=args.host, | |
user=args.user, | |
password=args.password, | |
port=args.port, | |
database=args.database, | |
unix_socket=args.socket | |
) | |
# Connect to MySQL | |
if not tester.connect(): | |
sys.exit(1) | |
# Create table | |
if not tester.create_table(): | |
tester.close() | |
sys.exit(1) | |
# Execute the requested action | |
try: | |
iteration = 0 | |
while args.count == -1 or iteration < args.count: | |
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
print(f"\n[{timestamp}] Iteration {iteration + 1}") | |
if args.action == 'insert': | |
success = tester.insert_dummy_data(1) | |
elif args.action == 'read': | |
success = tester.read_data() | |
elif args.action == 'dummy-select': | |
success = tester.dummy_select() | |
elif args.action == 'stats': | |
success = tester.get_statistics() | |
if not success: | |
print("Operation failed, exiting...") | |
break | |
iteration += 1 | |
if args.count != -1 and iteration >= args.count: | |
break | |
time.sleep(args.delay) | |
finally: | |
tester.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment