Last active
April 9, 2018 20:02
-
-
Save AlecZadikian9001/4f10131917bb61b3e192da8208d9d312 to your computer and use it in GitHub Desktop.
Python objects for easy multiprocessing and batching
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Created by AlecZ | |
# Tested with Python 3.4 | |
# | |
# These example classes make it easy to multiprocess* in Python, | |
# useful if you want to take advantage of multiple cores on a single machine | |
# or run lots of low-CPU-usage but blocking tasks like web scrapers | |
# without configuring a more permanent solution like Celery workers. | |
# | |
# * Actually runs multiple Python processes to take advantage of more cores, | |
# unlike Python's `multithreading` module helpers that actually don't run things in parallel. | |
# | |
# Warning: You can send most types (including custom ones) in the parameters, | |
# but don't send anything that relies on external state, like file handles or psycopg2 cursors. | |
# Not sure exactly what happens if you do that, but probably bad things. | |
# Also, some things like functions with Java bindings won't work properly in child processes. | |
from multiprocessing import Process, Queue | |
# basic: spawns a worker per task and gets their results in arbitrary order | |
class Multiprocessor(): | |
def __init__(self): | |
self.processes = [] | |
self.queue = Queue() | |
@staticmethod | |
def _wrapper(func, queue, args, kwargs): | |
ret = func(*args, **kwargs) | |
queue.put(ret) | |
def run(self, func, *args, **kwargs): | |
args2 = [func, self.queue, args, kwargs] | |
p = Process(target=self._wrapper, args=args2) | |
self.processes.append(p) | |
p.start() | |
def wait(self): | |
rets = [] | |
for p in self.processes: | |
ret = self.queue.get() | |
rets.append(ret) | |
for p in self.processes: | |
p.join() | |
self.processes = [] | |
return rets | |
# more advanced: uses a worker pool and gets results in order, like Celery | |
class Batcher(): | |
CMD_JOB = 0 | |
CMD_KILL = 1 | |
def __init__(self, num_workers): | |
self.jobs = [] | |
self.num_workers = num_workers | |
@staticmethod | |
def _worker(in_queue, out_queue): | |
while True: | |
# listen for new jobs | |
cmd, index, job = in_queue.get() | |
if cmd == Batcher.CMD_JOB: | |
# process job, return result | |
func, args, kwargs = job | |
ret = func(*args, **kwargs) | |
out_queue.put((index, ret)) | |
elif cmd == Batcher.CMD_KILL: | |
# time to stop | |
return | |
else: | |
assert False | |
def enqueue(self, func, *args, **kwargs): | |
job = (func, args, kwargs) | |
self.jobs.append(job) | |
def process(self, num_workers=None): | |
if num_workers is None: | |
num_workers = self.num_workers | |
# spawn workers | |
in_queue, out_queue = Queue(), Queue() | |
workers = [] | |
for _ in range(num_workers): | |
p = Process(target=self._worker, args=(in_queue, out_queue)) | |
workers.append(p) | |
p.start() | |
# put jobs into queue | |
job_idx = 0 | |
for start in range(0, len(self.jobs), num_workers): | |
for job in self.jobs[start: start + num_workers]: | |
in_queue.put((Batcher.CMD_JOB, job_idx, job)) | |
job_idx += 1 | |
# get results from workers | |
results = [None] * len(self.jobs) | |
for _ in range(len(self.jobs)): | |
res_idx, res = out_queue.get() | |
assert results[res_idx] == None | |
results[res_idx] = res | |
# stop workers | |
for _ in range(num_workers): | |
in_queue.put((Batcher.CMD_KILL, None, None)) | |
for i in range(num_workers): | |
workers[i].join() | |
return results | |
# tester/examples | |
if __name__ == "__main__": | |
for _ in range(4): | |
mp = Multiprocessor() | |
num_jobs = 64 | |
# we'll just sum 5 numbers a bunch of times | |
for _ in range(num_jobs): | |
mp.run(sum, [1, 2, 3, 4, 5]) | |
ret = mp.wait() | |
print(ret) | |
assert len(ret) == num_jobs and all(r == 15 for r in ret) | |
for _ in range(4): | |
mp2 = Batcher(num_workers=4) | |
num_jobs = 64 | |
# same, but this time we sum a different set of numbers each time and care about the results' order | |
for i in range(num_jobs): | |
mp2.enqueue(sum, [1, 2, 3, 4, 5, i]) | |
ret = mp2.process() | |
print(ret) | |
assert len(ret) == num_jobs and all(r == 15 + i for i, r in enumerate(ret)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment