Skip to content

Instantly share code, notes, and snippets.

@aaronsnoswell
Created October 14, 2018 23:24
Show Gist options
  • Save aaronsnoswell/a8f0e9424cd668de9a4c5fea17265557 to your computer and use it in GitHub Desktop.
Save aaronsnoswell/a8f0e9424cd668de9a4c5fea17265557 to your computer and use it in GitHub Desktop.
Incrementally process a CSV file that is too large to fit in memory
import csv
import itertools
import multiprocessing.pool
def process_large_csv(
filename,
per_row,
worker_pool,
*,
start=0,
num_per_worker=10
):
"""Incrementally process a CSV file that is too large to fit in memory
This function will read part of a CSV file into memory, create several
parallel workers, and then process a small portion of the file by
distributing rows from the file across the workers. When all workers are
done, the results are returned as a list to the caller.
Args:
filename (str): File name of the CSV file to process
per_row (callable): Function f(dict) -> any to call on each row. The
function should accept a csv.DictReader row and return the
processed row.
worker_pool (multiprocessing.Pool): Worker pool to distribute work
across
start (int): Row in the CSV file to start at. This parameter can be
used to skip already-processed sections of the file.
num_workers (int): Number of parallel workers to use
num_per_workers (int): Number of rows each worker should process
Returns:
(list): A list of the processed results
"""
# Figure out how many rows we aim to process
num_to_process = worker_pool._processes * num_per_worker
# Open file
file = open(filename)
# Get CSV reader
it = csv.DictReader(file)
# Crop to section we are interested in
it = itertools.islice(it, start, start + num_to_process)
# Distribute iterator across workers
it = worker_pool.imap(
per_row,
it,
chunksize=num_per_worker
)
# Process the iterator and return
return list(it)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment