Created
April 13, 2019 03:48
-
-
Save gyli/1a0f5a39fdd5499ec559a48ffdb48923 to your computer and use it in GitHub Desktop.
Processing large CSV chunks unevenly with Pandas and multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import time | |
class WorkerPool: | |
def __init__(self, worker_number): | |
self.worker_number = worker_number | |
self.pool = [multiprocessing.Process()] * worker_number | |
def run(self, target, args=None, sleep_time=1): | |
if not args: | |
args = [] | |
while True: | |
for worker in range(self.worker_number): | |
if not self.pool[worker].is_alive(): | |
self.pool[worker] = multiprocessing.Process(target=target, args=args) | |
self.pool[worker].start() | |
return self.pool[worker] | |
time.sleep(sleep_time) | |
def processing_chunk(chunk): | |
pass | |
pool = WorkerPool(worker_number=4) | |
for chunk in pd.read_csv('data.csv', chunksize=1000): | |
pool.run(processing_chunk, (chunk,)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment