Last active
February 14, 2023 23:02
-
-
Save cjbj/c7c6f35d783be6d164f456db175d7844 to your computer and use it in GitHub Desktop.
A sample Python program that reads from an Oracle Database table in parallel threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# parallelselect.py | |
# | |
# [email protected], Feb 2023 | |
# | |
# This may or may not be faster than a single thread statement that | |
# reads the whole table. | |
# | |
import csv | |
import os | |
import platform | |
import threading | |
import oracledb | |
# To fetch everything, keep NUM_THREADS * BATCH_SIZE >= TABLE_SIZE | |
# Number of rows to insert into the demo table | |
TABLE_SIZE = 10000 | |
# The degree of parallelism / number of connections to open | |
NUM_THREADS = 10 | |
# How many rows to fetch in each thread | |
BATCH_SIZE = 1000 | |
# Internal buffer size: Tune this for performance | |
ARRAY_SIZE = 1000 | |
SQL = """ | |
select data | |
from demo | |
order by id | |
offset :rowoffset rows fetch next :maxrows rows only | |
""" | |
un = os.environ.get('PYTHON_USERNAME') | |
pw = os.environ.get('PYTHON_PASSWORD') | |
cs = os.environ.get('PYTHON_CONNECTSTRING') | |
if os.environ.get('DRIVER_TYPE') == 'thick': | |
ld = None | |
if platform.system() == 'Darwin' and platform.machine() == 'x86_64': | |
ld = os.environ.get('HOME')+'/Downloads/instantclient_19_8' | |
elif platform.system() == 'Windows': | |
ld = r'C:\oracle\instantclient_19_17' | |
oracledb.init_oracle_client(lib_dir=ld) | |
# Create a connection pool | |
pool = oracledb.create_pool(user=un, password=pw, dsn=cs, min=NUM_THREADS, max=NUM_THREADS) | |
# | |
# Create the table for the demo | |
# | |
def create_schema(): | |
with oracledb.connect(user=un, password=pw, dsn=cs) as connection: | |
with connection.cursor() as cursor: | |
connection.autocommit = True | |
cursor.execute(""" | |
begin | |
begin | |
execute immediate 'drop table demo'; | |
exception when others then | |
if sqlcode <> -942 then | |
raise; | |
end if; | |
end; | |
execute immediate 'create table demo ( | |
id number generated by default as identity, | |
data varchar2(40))'; | |
insert into demo (data) | |
select to_char(rownum) | |
from dual | |
connect by level <= :table_size; | |
end;""", table_size=TABLE_SIZE) | |
def do_write_csv(tn): | |
with pool.acquire() as connection: | |
with connection.cursor() as cursor: | |
cursor.arraysize = ARRAY_SIZE | |
f = open(f"emp{tn}.csv", "w") | |
writer = csv.writer(f, lineterminator="\n", quoting=csv.QUOTE_NONNUMERIC) | |
cursor.execute(SQL, rowoffset=(tn*BATCH_SIZE), maxrows=BATCH_SIZE) | |
col_names = [row[0] for row in cursor.description] | |
writer.writerow(col_names) | |
while True: | |
rows = cursor.fetchmany() # extra call at end won't incur extra round-trip | |
if not rows: | |
break | |
writer.writerows(rows) | |
f.close() | |
def do_query(tn): | |
with pool.acquire() as connection: | |
with connection.cursor() as cursor: | |
cursor.arraysize = ARRAY_SIZE | |
cursor.execute(SQL, rowoffset=(tn*BATCH_SIZE), maxrows=BATCH_SIZE) | |
while True: | |
rows = cursor.fetchmany() # extra call at end won't incur extra round-trip | |
if not rows: | |
break | |
print(f'Thread {tn}', rows) | |
# | |
# Start the desired number of threads. | |
# | |
def start_workload(): | |
thread = [] | |
for i in range(NUM_THREADS): | |
# call the routine to write to CSV or just to display output | |
t = threading.Thread(target=do_write_csv, args=(i,)) | |
#t = threading.Thread(target=do_query, args=(i,)) | |
t.start() | |
thread.append(t) | |
for i in range(NUM_THREADS): | |
thread[i].join() | |
if __name__ == '__main__': | |
create_schema() | |
start_workload() | |
print("All done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment