Skip to content

Instantly share code, notes, and snippets.

@monkut
Created July 23, 2018 01:03
Show Gist options
  • Save monkut/4b5c19af3da325f4ca06b52983f64e90 to your computer and use it in GitHub Desktop.
Save monkut/4b5c19af3da325f4ca06b52983f64e90 to your computer and use it in GitHub Desktop.
A common pattern for processing csv files in parallel
import os
import csv
from multiprocessing import cpu_count, Pool
def process_csv(args):
filepath, encoding = args
unique_ids = set()
with open(filepath, 'r', encoding=encoding) as in_f:
reader = csv.reader(in_f)
desired_data_id_index = 1
for row in reader:
# extract data here
data_id = row[desired_data_id_index]
unique_ids.add(data_id)
return filepath, unique_ids
def process_csv_file_data(directory_of_csv_files, input_encoding='utf8'):
function_arguments = []
for filename in os.listdir(directory_of_csv_files):
filepath = os.path.join(directory_of_csv_files, filename)
function_arguments.append([filepath, input_encoding])
processes = cpu_count()
pool = Pool(processes=processes)
with open('results.csv', 'w', encoding='utf8') as out_f:
writer = csv.writer(out_f)
total_count = len(function_arguments)
for processed_count, (filepath, data) in enumerate(pool.imap_unordered(process_csv, function_arguments), 1):
finished_percentage = round((processed_count/total_count) * 100, 2)
print('[ {}/{} {}%] Finished ({})!'.format(processed_count, total_count, finished_percentage, filepath))
output_row = [filepath, *data]
writer.writerow(output_row)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment