Last active
March 11, 2025 11:59
-
-
Save mzhang77/4f84edbca52f45e04c6d30728c90c318 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
import mysql.connector | |
import uuid | |
import random | |
''' | |
create table tb ( | |
l bigint(20) not null primary key clustered, | |
c binary(16) not null, | |
created_at timestamp not null default CURRENT_TIMESTAMP, | |
updated_at timestamp not null default CURRENT_TIMESTAMP on update current_timestamp, | |
unique key (c) | |
); | |
''' | |
# Database connection settings | |
DB_CONFIG = { | |
"host": "127.0.0.1", | |
"user": "root", | |
"port": "4000", | |
"password": "", | |
"database": "test", | |
} | |
# Helper function to generate a random UUID in binary(16) format | |
def generate_uuid(): | |
return uuid.uuid4().bytes | |
# Thread 1: Continuously inserts random data | |
def insert_thread(): | |
connection = mysql.connector.connect(**DB_CONFIG) | |
cursor = connection.cursor() | |
while True: | |
try: | |
l = random.randint(1, 1000) # Simulated login_method_id | |
c = generate_uuid() | |
cursor.execute( | |
"INSERT INTO tb (l,c) VALUES (%s, %s)", | |
(l, c), | |
) | |
connection.commit() | |
print(f"[INSERT] Added (l={l}, c={c.hex()})") | |
time.sleep(1) # Adjust as needed | |
except mysql.connector.Error as e: | |
print(f"[ERROR] Insert failed: {e}") | |
connection.rollback() | |
cursor.close() | |
connection.close() | |
# Thread 2: Randomly pick two rows and swap credentials | |
def swap_thread(): | |
connection = mysql.connector.connect(**DB_CONFIG) | |
cursor = connection.cursor() | |
while True: | |
try: | |
# Step 1: Randomly pick two distinct rows | |
cursor.execute( | |
"SELECT l, c FROM tb ORDER BY RAND() LIMIT 2" | |
) | |
rows = cursor.fetchall() | |
if len(rows) < 2: | |
connection.commit() | |
print("[SWAP] Not enough rows to swap.") | |
time.sleep(1) | |
continue | |
(l1, c1), (l2, c2) = rows # Extract login_method_id and credential_id | |
if l1 == l2: | |
connection.commit() | |
print("[SWAP] Picked same l, retrying...") | |
time.sleep(1) | |
continue | |
# Step 2: Delete both rows | |
cursor.execute( | |
"DELETE FROM tb WHERE c = %s AND l = %s", | |
(c1, l1), | |
) | |
cursor.execute( | |
"DELETE FROM tb WHERE c = %s AND l = %s", | |
(c2, l2), | |
) | |
# Step 3: Insert swapped and new credentials | |
cursor.execute( | |
"INSERT INTO tb (c, l) VALUES (%s, %s)", | |
(c1, l2), # Move c1 to l2 | |
) | |
c3 = generate_uuid() # Generate new credential for l1 | |
cursor.execute( | |
"INSERT INTO tb (c, l) VALUES (%s, %s)", | |
(c3, l1), # Insert new credential for l1 | |
) | |
connection.commit() | |
print(f"[SWAP] Moved {c1.hex()} -> {l2}, Inserted new {c3.hex()} -> {l1}") | |
time.sleep(1) # Adjust as needed | |
except mysql.connector.Error as e: | |
print(f"[ERROR] Swap failed: {e}") | |
connection.rollback() | |
cursor.close() | |
connection.close() | |
# Start both threads | |
t1 = threading.Thread(target=insert_thread, daemon=True) | |
t2 = threading.Thread(target=swap_thread, daemon=True) | |
t1.start() | |
t2.start() | |
# Keep the program running | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("Stopping threads.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment