Created
May 12, 2017 11:35
-
-
Save pykong/7d677fbad75ddddbb5316a7e510a4f99 to your computer and use it in GitHub Desktop.
Very easy interface for multi-processed execution of functions in python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
class EasyMultiProcessing(object): | |
""" | |
To Process the functions in parallel | |
""" | |
def __init__(self, | |
func, | |
data, | |
*args, # put here to enforce keyword usage | |
pool_size=None, | |
daemon=True, | |
verbose=False, | |
**kwargs | |
): | |
""" | |
:type kwargs: object | |
:type args: object | |
""" | |
self.func = func | |
self.data = data # must be iterable! | |
# number of threads according to length of iterable, else as specified: | |
if pool_size is None: | |
pool_size = multiprocessing.cpu_count() | |
self.verbose = verbose | |
self.args = args | |
self.kwargs = kwargs | |
# stop sign: | |
self.on = True | |
# setting up and filling queue: | |
self.q = multiprocessing.JoinableQueue() | |
self.fill_queue(self.data) | |
# initializing threads: | |
self.processes = [multiprocessing.Process(target=self._processor) for _ in range(pool_size)] | |
for p in self.processes: | |
p.daemon = daemon | |
self.print_lock = multiprocessing.Lock() | |
self.sentinel = "SENTINEL" | |
def _processor(self): | |
# while True: # while loop crucial here, to process all jobs in queue | |
# while not self.q.empty(): # works as long queue is not empty requires external while loop | |
# while self.on: # stop sign | |
while True: | |
# gets task (=data item) from the queue | |
task = self.q.get() | |
if task == self.sentinel: | |
break | |
# print what process is currently working on: | |
if self.verbose: | |
# generating indexed thread name in the form of: func_name.thread.001: | |
process_index = int(multiprocessing.current_process().name.split('-')[-1]) | |
process_name = self.func.__name__ + '.process.' + '{0:03d}'.format(process_index) | |
with self.print_lock: | |
print('{} is working on: {}'.format(process_name, task)) | |
# run the job with the available worker in queue (process) | |
if self.args and self.kwargs: | |
self.func(task, self.args, self.kwargs) | |
elif self.args: | |
self.func(task, self.args) | |
elif self.kwargs: | |
self.func(task, self.kwargs) | |
else: | |
self.func(task) | |
# on job completion | |
self.q.task_done() | |
if self.verbose: | |
with self.print_lock: | |
print(('{} finished job.'.format(process_name))) | |
def fill_queue(self, in_arg): | |
if self.on: # accepts input as long as stop sign is not shown | |
[self.q.put(a) for a in in_arg if a is not None] | |
elif not self.on: | |
raise Exception('No active threads.') | |
def start_all(self): | |
[p.start() for p in self.processes] | |
def join_all(self): | |
self.on = False | |
[self.q.put(self.sentinel) for _ in self.processes] # putting sentinels in Queue | |
[p.join() for p in self.processes] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
# EXAMPLE: ------------------------------------------------ | |
def dummy_func(dum_var, arg_rec): | |
if type(arg_rec) is tuple: # important to accept various input | |
arg_rec = arg_rec[0] | |
time.sleep(1) | |
print(('dum_var: %s' % dum_var)) | |
for arg in arg_rec: | |
print(('Another arg: ', arg)) | |
# How to run: | |
if __name__ == '__main__': | |
dum_list = [i for i in range(20)] | |
# test_threads = EasyThreading2(dummy_func, dum_list, 10, True, True, 'I am another arg!', 'Me too.') | |
test_threads = EasyMultiProcessing(dummy_func, dum_list, arg_rec=['I am another arg!', 'Me too.'], verbose=True) | |
# starts process execution | |
test_threads.start_all() | |
time.sleep(5) | |
# testing out adding to queue | |
new_args = [y for y in range(50, 90, 2)] | |
test_threads.fill_queue(new_args) | |
# wait until all the threads got executed | |
test_threads.join_all() | |
print("Everything finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment