Created
August 31, 2012 09:09
-
-
Save bencord0/3550592 to your computer and use it in GitHub Desktop.
A multithreaded implementation of python's map().
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def concurrent_map(func, *iterm, **kwargs): | |
""" | |
threaded version of map() | |
Usage: | |
>>> my_data = [1,2,3,4,5] | |
>>> def my_func(x): | |
... return x*x | |
... | |
>>> concurrent_map(my_func, my_data) | |
[1, 4, 9, 16, 25] | |
>>> concurrent_map(lambda x,y: x+y, [1,3,5,7,9], [2,4,6,8]) | |
[3, 7, 11, 15, TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)] | |
""" | |
# Imports are here. If you are copy/pasting this into other code, | |
# this won't fill your namespace. If already imported, no harm. | |
# Refactor to your needs. | |
import threading | |
from time import sleep | |
tasks = {} # Dictionary of threads (key) and result of the function (value) | |
tasks_lock = threading.Lock() # protect the tasks | |
task_order = [] # The order of keys for tasks is unpredictable, add them in order here. | |
threads = [] | |
pause = kwargs.get('pause', 0) | |
daemon = kwargs.get('daemon', False) | |
def func_wrapper(func, arg): | |
idx = threading.current_thread().name | |
# With inspiration from twisted, treat exceptions as return values. | |
try: | |
result = func(*arg) | |
except Exception as e: | |
result = e | |
with tasks_lock: | |
tasks[idx] = result | |
# Pad short lists with None | |
for i in iterm: | |
i.extend([None] * (max([len(n) for n in iterm])-len(i))) | |
# Form a list of args | |
args_list = zip(*iterm) | |
for args in args_list: | |
thread = threading.Thread(target=func_wrapper, args=(func, args)) | |
thread.daemon = daemon | |
threads.append(thread) | |
task_order.append(thread.name) | |
thread.start() | |
sleep(pause) | |
for thread in threads: | |
thread.join() | |
results = [tasks[idx] for idx in task_order] | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment