Last active
April 28, 2022 20:28
-
-
Save edvardm/661522923300a1fc657ac7480530faa7 to your computer and use it in GitHub Desktop.
single producer/consumer skeleton for keeping Queue full of db rows to process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Using sample data from https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads | |
# See https://wiki.postgresql.org/wiki/Sample_Databases for table schema & instructions how to populate | |
# Simple script to test the idea of keeping Queue full of database rows ready for consumption, using appropriate-sized | |
# chunks of rows | |
import multiprocessing as mp | |
import queue | |
import psycopg2 | |
MAX_QUEUE_SIZE = 8 | |
BATCH_SIZE = 5000 | |
TIMEOUT = 30 | |
db_conn = psycopg2.connect("postgresql://postgres:sekrit@localhost/test") | |
def consume(q): | |
while True: | |
try: | |
batch = q.get(block=True, timeout=TIMEOUT) | |
process_batch(batch) | |
except queue.Empty: | |
print("All done, I'm outta here") | |
break | |
def run(): | |
q = mp.Queue(maxsize=MAX_QUEUE_SIZE) | |
processes = [] | |
# create single producer, but would be simple to scale to multiple given reasonable way | |
# to partition entries | |
proc = mp.Process(target=fetch_from_db, args=(q,)) | |
proc.start() | |
processes.append(proc) | |
# create single consumer, but could be simple to scale given this and that | |
proc = mp.Process(target=consume, args=(q,)) | |
proc.start() | |
processes.append(proc) | |
# wait for processes to finish | |
print("reap threads") | |
for process in processes: | |
process.join() | |
q.close() | |
def fetch_from_db(q): | |
cursor = db_conn.cursor() | |
print("Executing query") | |
cursor.execute( | |
"SELECT transaction, price, transfer_date, postcode, street, city from land_registry_price_paid_uk" | |
) | |
count = 0 | |
idx = 0 | |
while True: | |
idx += 1 | |
batch = cursor.fetchmany(BATCH_SIZE) | |
count += len(batch) | |
if not batch: | |
print(f"*** Producer done after {count} entries") | |
break | |
print(f"Inserting batch {idx}, head {batch[0]}", end="... ") | |
q.put(batch) | |
print("done") | |
def process_batch(chunk): | |
print(f"processing chunk of size {len(chunk)}, head {chunk[0]}", end="... ") | |
with open("tmp.dat", "a") as fh: | |
for row in chunk: | |
fh.write(f"{row}\n") | |
print("done") | |
if __name__ == "__main__": | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment