Skip to content

Instantly share code, notes, and snippets.

@gyli
Created April 13, 2019 03:48
Show Gist options
  • Save gyli/1a0f5a39fdd5499ec559a48ffdb48923 to your computer and use it in GitHub Desktop.
Save gyli/1a0f5a39fdd5499ec559a48ffdb48923 to your computer and use it in GitHub Desktop.
Processing large CSV chunks unevenly with Pandas and multiprocessing
import multiprocessing
import time
class WorkerPool:
def __init__(self, worker_number):
self.worker_number = worker_number
self.pool = [multiprocessing.Process()] * worker_number
def run(self, target, args=None, sleep_time=1):
if not args:
args = []
while True:
for worker in range(self.worker_number):
if not self.pool[worker].is_alive():
self.pool[worker] = multiprocessing.Process(target=target, args=args)
self.pool[worker].start()
return self.pool[worker]
time.sleep(sleep_time)
def processing_chunk(chunk):
pass
pool = WorkerPool(worker_number=4)
for chunk in pd.read_csv('data.csv', chunksize=1000):
pool.run(processing_chunk, (chunk,))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment