Last active
May 16, 2019 01:53
-
-
Save kmuthukk/fa9fd54278c91aba14979fd8ba8d4973 to your computer and use it in GitHub Desktop.
a parallel program to test simple join performance with YugaByte postgres-compatible YSQL API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dependencies: | |
# On CentOS you can install psycopg2 thus: | |
# | |
# sudo yum install postgresql-libs | |
# sudo yum install python-psycopg2 | |
# | |
import psycopg2 | |
import time | |
import random | |
from multiprocessing.dummy import Pool as ThreadPool | |
num_threads=8 | |
num_users=num_threads*1000 | |
host="10.150.0.27" | |
num_depts=1000 | |
num_read_threads=4 | |
num_reads_per_thread=1000 | |
def create_tables(): | |
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
start_time = time.time() | |
cur.execute("""DROP TABLE IF EXISTS emp"""); | |
print("Dropped (if exists): emp table") | |
cur.execute("""DROP TABLE IF EXISTS dept"""); | |
print("Dropped (if exists): dept table") | |
now_time = time.time() | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
start_time = time.time() | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS emp( | |
eid int, | |
ename text, | |
salary float, | |
dept_id int, | |
PRIMARY KEY(eid) | |
)""") | |
now_time = time.time() | |
print("Created emp table") | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
start_time = time.time() | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS dept( | |
dept_id int, | |
dept_name text, | |
PRIMARY KEY(dept_id) | |
)""") | |
now_time = time.time() | |
print("Created dept table") | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
def load_data_slave(thread_num): | |
thread_id = str(thread_num) | |
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
num_users_per_thread = num_users/num_threads | |
print("Thread-" + thread_id + ": ==================") | |
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users_per_thread)) | |
start_time = time.time() | |
for idx in range(num_users_per_thread): | |
eid = (thread_num * num_users_per_thread) + idx | |
ename = "name-" + str(eid) | |
salary = 50000 + ((eid % 100) * 1000) | |
dept_id = eid % num_depts; | |
cur.execute("""INSERT INTO emp(eid, ename, salary, dept_id) VALUES (%s, %s, %s, %s)""", | |
(eid, ename, salary, dept_id)) | |
now_time = time.time() | |
print("Thread-" + thread_id + ": Inserted %d rows" % (num_users_per_thread)) | |
print("Thread-" + thread_id + ": Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Inserts/sec: %s ---" % ((num_users_per_thread) / (now_time - start_time))) | |
print("Thread-" + thread_id + ": Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_users_per_thread))) | |
def populate_emp_table(): | |
pool = ThreadPool(num_threads) | |
results = pool.map(load_data_slave, range(num_threads)) | |
def populate_dept_table(): | |
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Inserting %d rows in dept table" % (num_depts)) | |
start_time = time.time() | |
for idx in range(num_depts): | |
dept_name = "dname-" + str(idx) | |
cur.execute("""INSERT INTO dept(dept_id, dept_name) VALUES (%s, %s)""", | |
(idx, dept_name)) | |
now_time = time.time() | |
print("Inserted %d rows in dept" % (num_depts)) | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
print("Inserts/sec: %s ---" % ((num_depts) / (now_time - start_time))) | |
print("Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_depts))) | |
def read_data_slave(thread_num): | |
thread_id=str(thread_num) | |
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": Reading %d rows..." % (num_reads_per_thread)) | |
start_time = time.time() | |
cur.execute("PREPARE myplan AS select eid, ename, salary, dept_name from emp, dept where eid=$1 and emp.dept_id = dept.dept_id") | |
for i in range(num_reads_per_thread): | |
rand_user_id = random.randint(0, num_users-1) | |
# cur.execute("select eid, ename, salary, dept_name from emp, dept where eid=%s and emp.dept_id = dept.dept_id", (rand_user_id,)) | |
cur.execute("EXECUTE myplan(%s)", (rand_user_id,)) | |
row = cur.fetchone() | |
eid = row[0] | |
ename = row[1] | |
salary = row[2] | |
dept_name = row[3] | |
if ((eid != rand_user_id) or | |
(ename != "name-" + str(rand_user_id)) or | |
(salary != (50000 + ((eid % 100) * 1000))) or | |
(dept_name != "dname-" + str(rand_user_id % num_depts))): | |
print("Incorrect data for " + str(rand_user_id)) | |
print("eid=" + str(eid)) | |
print("ename=" + ename) | |
print("salary=" + str(salary)) | |
print("dept_name=" + dept_name) | |
now_time = time.time() | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_reads_per_thread))) | |
def test_reads_with_simple_join(): | |
pool = ThreadPool(num_read_threads) | |
results = pool.map(read_data_slave, range(num_read_threads)) | |
create_tables() | |
populate_dept_table() | |
populate_emp_table() | |
test_reads_with_simple_join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment