Created
October 14, 2018 23:24
-
-
Save aaronsnoswell/a8f0e9424cd668de9a4c5fea17265557 to your computer and use it in GitHub Desktop.
Incrementally process a CSV file that is too large to fit in memory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import itertools | |
import multiprocessing.pool | |
def process_large_csv( | |
filename, | |
per_row, | |
worker_pool, | |
*, | |
start=0, | |
num_per_worker=10 | |
): | |
"""Incrementally process a CSV file that is too large to fit in memory | |
This function will read part of a CSV file into memory, create several | |
parallel workers, and then process a small portion of the file by | |
distributing rows from the file across the workers. When all workers are | |
done, the results are returned as a list to the caller. | |
Args: | |
filename (str): File name of the CSV file to process | |
per_row (callable): Function f(dict) -> any to call on each row. The | |
function should accept a csv.DictReader row and return the | |
processed row. | |
worker_pool (multiprocessing.Pool): Worker pool to distribute work | |
across | |
start (int): Row in the CSV file to start at. This parameter can be | |
used to skip already-processed sections of the file. | |
num_workers (int): Number of parallel workers to use | |
num_per_workers (int): Number of rows each worker should process | |
Returns: | |
(list): A list of the processed results | |
""" | |
# Figure out how many rows we aim to process | |
num_to_process = worker_pool._processes * num_per_worker | |
# Open file | |
file = open(filename) | |
# Get CSV reader | |
it = csv.DictReader(file) | |
# Crop to section we are interested in | |
it = itertools.islice(it, start, start + num_to_process) | |
# Distribute iterator across workers | |
it = worker_pool.imap( | |
per_row, | |
it, | |
chunksize=num_per_worker | |
) | |
# Process the iterator and return | |
return list(it) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment