Last active
April 18, 2019 03:11
-
-
Save kmuthukk/72ca6f15b9d2418bc05fcf42d55c329c to your computer and use it in GitHub Desktop.
sample python program to do single row inserts in parallel for YugaByte DB's YSQL (postgres compatible API)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2; | |
import time | |
from multiprocessing.dummy import Pool as ThreadPool | |
num_threads=8 | |
num_users=1000 | |
num_msgs=50 | |
table_name = "user_actions" | |
def create_table(): | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
start_time = time.time() | |
cur.execute("""DROP TABLE IF EXISTS %s""" % (table_name)); | |
now_time = time.time() | |
print("Dropped (if exists): %s table" % (table_name)) | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
start_time = time.time() | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS %s( | |
id text, | |
msg_id integer, | |
msg text, | |
PRIMARY KEY(id, msg_id) | |
) | |
""" % (table_name)) | |
now_time = time.time() | |
print("Created: " + table_name) | |
print("Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
def load_data_slave(thread_num): | |
thread_id = str(thread_num) | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": ==================") | |
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users*num_msgs)) | |
start_time = time.time() | |
for idx in range(num_users): | |
for jdx in range(num_msgs): | |
cur.execute("""INSERT INTO """ + table_name + """ (id, msg_id, msg) VALUES (%s, %s, %s)""", | |
("u-"+thread_id+"-"+str(idx), | |
jdx, | |
"msg--"+str(idx)+"--"+str(jdx))) | |
now_time = time.time() | |
print("Thread-" + thread_id + ": Inserted %d rows" % (num_msgs * num_users)) | |
print("Thread-" + thread_id + ": Time: %s ms ---" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Inserts/sec: %s ---" % ((num_msgs * num_users) / (now_time - start_time))) | |
print("Thread-" + thread_id + ": Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_msgs * num_users))) | |
create_table() | |
pool = ThreadPool(num_threads) | |
t1 = time.time() | |
results = pool.map(load_data_slave, range(num_threads)) | |
t2 = time.time() | |
total_rows=num_users*num_msgs*num_threads | |
print("====================") | |
print("Inserted %d rows" % (total_rows)) | |
print("Time: %s ms ---" % ((t2 - t1) * 1000)) | |
print("Inserts/sec: %s ---" % (total_rows / (t2 - t1))) | |
print("====================") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Above program does no batching.
With RF=1, on 4-core setup, on YB latest (1.2.5), seeing about:
With 8 concurrent threads:
With 4 concurrent threads:
With 1 concurrent thread:
With 16 concurrent threads: