Skip to content

Instantly share code, notes, and snippets.

@AlecZadikian9001
Last active April 9, 2018 20:02
Show Gist options
  • Save AlecZadikian9001/4f10131917bb61b3e192da8208d9d312 to your computer and use it in GitHub Desktop.
Save AlecZadikian9001/4f10131917bb61b3e192da8208d9d312 to your computer and use it in GitHub Desktop.
Python objects for easy multiprocessing and batching
# Created by AlecZ
# Tested with Python 3.4
#
# These example classes make it easy to multiprocess* in Python,
# useful if you want to take advantage of multiple cores on a single machine
# or run lots of low-CPU-usage but blocking tasks like web scrapers
# without configuring a more permanent solution like Celery workers.
#
# * Actually runs multiple Python processes to take advantage of more cores,
# unlike Python's `multithreading` module helpers that actually don't run things in parallel.
#
# Warning: You can send most types (including custom ones) in the parameters,
# but don't send anything that relies on external state, like file handles or psycopg2 cursors.
# Not sure exactly what happens if you do that, but probably bad things.
# Also, some things like functions with Java bindings won't work properly in child processes.
from multiprocessing import Process, Queue
# basic: spawns a worker per task and gets their results in arbitrary order
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
self.processes = []
return rets
# more advanced: uses a worker pool and gets results in order, like Celery
class Batcher():
CMD_JOB = 0
CMD_KILL = 1
def __init__(self, num_workers):
self.jobs = []
self.num_workers = num_workers
@staticmethod
def _worker(in_queue, out_queue):
while True:
# listen for new jobs
cmd, index, job = in_queue.get()
if cmd == Batcher.CMD_JOB:
# process job, return result
func, args, kwargs = job
ret = func(*args, **kwargs)
out_queue.put((index, ret))
elif cmd == Batcher.CMD_KILL:
# time to stop
return
else:
assert False
def enqueue(self, func, *args, **kwargs):
job = (func, args, kwargs)
self.jobs.append(job)
def process(self, num_workers=None):
if num_workers is None:
num_workers = self.num_workers
# spawn workers
in_queue, out_queue = Queue(), Queue()
workers = []
for _ in range(num_workers):
p = Process(target=self._worker, args=(in_queue, out_queue))
workers.append(p)
p.start()
# put jobs into queue
job_idx = 0
for start in range(0, len(self.jobs), num_workers):
for job in self.jobs[start: start + num_workers]:
in_queue.put((Batcher.CMD_JOB, job_idx, job))
job_idx += 1
# get results from workers
results = [None] * len(self.jobs)
for _ in range(len(self.jobs)):
res_idx, res = out_queue.get()
assert results[res_idx] == None
results[res_idx] = res
# stop workers
for _ in range(num_workers):
in_queue.put((Batcher.CMD_KILL, None, None))
for i in range(num_workers):
workers[i].join()
return results
# tester/examples
if __name__ == "__main__":
for _ in range(4):
mp = Multiprocessor()
num_jobs = 64
# we'll just sum 5 numbers a bunch of times
for _ in range(num_jobs):
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait()
print(ret)
assert len(ret) == num_jobs and all(r == 15 for r in ret)
for _ in range(4):
mp2 = Batcher(num_workers=4)
num_jobs = 64
# same, but this time we sum a different set of numbers each time and care about the results' order
for i in range(num_jobs):
mp2.enqueue(sum, [1, 2, 3, 4, 5, i])
ret = mp2.process()
print(ret)
assert len(ret) == num_jobs and all(r == 15 + i for i, r in enumerate(ret))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment