Created
February 14, 2017 02:03
-
-
Save mherkazandjian/2a27a620330cf78912aa6e163642609d to your computer and use it in GitHub Desktop.
pmap example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
from multiprocessing import Process, cpu_count, Queue | |
def pmap(func, items, func_args=(), func_kwargs={}, n_workers=cpu_count()): | |
"""Maps the function "func" to each item in "items" in parallel using | |
threads. | |
:param callable func: A callable that takes "array" as a first positional | |
argument and returns an array that is the same size the input "array". | |
:param numpy.ndarray items: array to which func will be mapped in parallel. | |
:param tuple func_args: a tuple of argument to func after the 1st arg. | |
:param dict func_kwargs: a dictionary of keywords to be passed to func. | |
:param int n_workers: The number of workers to be used in the parallel | |
processing "items". By default all cores/threads are used. | |
:return: A numpy array of object of all the values returned by "func" | |
applied to all the entries in "items". | |
.. python: | |
def my_func(items): | |
retval = [] | |
for item in items: | |
# some expensive operation | |
sum = 0 | |
for i in xrange(10000000): | |
sum += item | |
retval.append(sum) | |
return retval | |
retval = pmap(numpy.linspace(0, 1, 1000), my_func) | |
""" | |
def worker_wrapper(worker_func, | |
worker_index, | |
queue, | |
worker_args, | |
worker_kwargs): | |
"""a wrapper around the worker function that inserts the value | |
returned by a worker into a queue""" | |
worker_retval = worker_func(*worker_args, **worker_kwargs) | |
queue.put([worker_index, worker_retval]) | |
# get the work loads of the workers and the indices of the entries in the | |
# sub-arrays to be dispatched to each worker. | |
items_for_workers = numpy.array_split(items, n_workers) | |
items_inds_workers = numpy.array_split(numpy.arange(items.size), n_workers) | |
# instantiate the thread and define the queue and the array that will | |
# hold the data computed and returned by the workers | |
queue = Queue() | |
threads = [] | |
retval = numpy.zeros(items.size, dtype='object') | |
for worker_index in range(n_workers): | |
thread = Process( | |
target=worker_wrapper, | |
args=(func, | |
worker_index, | |
queue, | |
(items_for_workers[worker_index],) + func_args, | |
func_kwargs)) | |
threads.append(thread) | |
for thread in threads: | |
thread.start() | |
# n_workers elements should be retrieved from the queue that is the | |
# data returned by each worker. The data of each worker is inserted | |
# into the array that will be returned in the same order of "items" | |
workers_data_retrieved = 0 | |
while workers_data_retrieved != n_workers: | |
worker_index, worker_data = queue.get() | |
for entry_index, entry in enumerate(worker_data): | |
item_ind = items_inds_workers[worker_index][entry_index] | |
retval[item_ind] = entry | |
workers_data_retrieved += 1 | |
for thread in threads: | |
thread.join() | |
return retval | |
if __name__ == '__main__': | |
import time | |
def my_func(items): | |
retval = [] | |
for item in items: | |
# some expensive operation | |
sum = 0 | |
for i in xrange(10000000): | |
sum += item | |
retval.append(sum) | |
return retval | |
t0 = time.time() | |
ret_data = pmap(my_func, numpy.arange(8), n_workers=8) | |
print(time.time() - t0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment