Last active
November 25, 2019 04:01
-
-
Save kmuthukk/19e8003dab6febda9a0396e1c88d35e3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dependencies: | |
# On CentOS you can install psycopg2 thus: | |
# | |
# sudo yum install postgresql-libs | |
# sudo yum install python-psycopg2 | |
import psycopg2; | |
import time; | |
from multiprocessing.dummy import Pool as ThreadPool | |
# For this test, just interested in raw performance. So not bothering about | |
# handling conflicts from concurrent transactions and retries in app code. Instead, | |
# in this test, each thread works on a different set of accounts. | |
num_threads = 4 | |
num_accounts_per_thread = 100 | |
num_txns_per_thread = 20000 | |
num_accounts = num_threads * num_accounts_per_thread | |
num_txns = num_threads * num_txns_per_thread | |
# host="localhost" | |
host="172.151.20.146" | |
def create_table(): | |
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
cur.execute("""DROP TABLE IF EXISTS accounts"""); | |
print("Dropped accounts if exists") | |
cur.execute("""DROP TABLE IF EXISTS transactions"""); | |
print("Dropped transactions if exists") | |
cur.execute("""CREATE TABLE accounts( | |
id INTEGER PRIMARY KEY NOT NULL, | |
balance BIGINT NOT NULL); | |
""") | |
print("Created accounts") | |
print("====================") | |
print("Install pgcrypto extension for gen_random_uuid()") | |
cur.execute("""CREATE EXTENSION IF NOT EXISTS pgcrypto"""); | |
cur.execute("""CREATE TABLE transactions( | |
id UUID PRIMARY KEY NOT NULL DEFAULT gen_random_uuid(), | |
from_account_id INTEGER NOT NULL, | |
to_account_id INTEGER NOT NULL, | |
amount BIGINT NOT NULL, | |
timestamp TIMESTAMP NOT NULL DEFAULT NOW()) | |
""") | |
print("Created transactions") | |
print("====================") | |
def seed_accounts_table(): | |
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Seeding accounts table with %d rows..." % (num_accounts)) | |
num_inserts = 0 | |
try: | |
for idx in range(num_accounts): | |
cur.execute("INSERT INTO accounts (id, balance) VALUES (%s, 5000)", (idx, )) | |
num_inserts += 1 | |
except Exception as e: | |
print("Unexpected exception: " + str(e)) | |
print("Inserted %d rows into accounts table" % (num_inserts)) | |
def run_transactions_slave(thread_num): | |
thread_id = str(thread_num) | |
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433") | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": Starting..") | |
num_success = 0 | |
num_exceptions = 0 | |
t1 = time.time() | |
num_rows_to_print_stats = 1000 | |
for idx in range(num_txns_per_thread): | |
try: | |
base_acct_id = (thread_num * num_accounts_per_thread); | |
from_id = base_acct_id + (idx % num_accounts_per_thread); | |
to_id = base_acct_id + ((idx + 1) % num_accounts_per_thread); | |
amount = 100 | |
cur.execute("""BEGIN"""); | |
cur.execute("""UPDATE accounts SET balance = balance - %s WHERE id = %s""", | |
(amount, from_id)) | |
cur.execute("""UPDATE accounts SET balance = balance + %s WHERE id = %s""", | |
(amount, to_id)) | |
cur.execute("""INSERT INTO transactions (from_account_id, to_account_id, amount) VALUES (%s, %s, %s)""", | |
(from_id, to_id, amount)) | |
cur.execute("""COMMIT"""); | |
num_success += 1 | |
if ((idx + 1) % num_rows_to_print_stats == 0): | |
t2 = time.time(); | |
avg_latency_ms = (t2 - t1) * 1000.0 / num_rows_to_print_stats | |
print("Thread-" + thread_id + ": Txns Completed %d; Avg Latency (ms) %f " % (idx + 1, avg_latency_ms)) | |
t1 = time.time() | |
except Exception as e: | |
num_exceptions += 1 | |
cur.execute("""ROLLBACK"""); | |
print("Unexpected exception: " + str(e)) | |
print("Thread-" + thread_id + ": Successful Txns: %d" % (num_success)) | |
print("Thread-" + thread_id + ": Exceptions %d " % (num_exceptions)) | |
def run_transactions(): | |
pool = ThreadPool(num_threads) | |
results = pool.map(run_transactions_slave, range(num_threads)) | |
# Main | |
create_table() | |
seed_accounts_table() | |
start_time = time.time() | |
run_transactions() | |
delta = time.time() - start_time | |
print("Total TXNS: %d, Time: %s secs; TPS=%f" % (num_txns, delta, num_txns/delta)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment