Last active
June 18, 2017 13:58
-
-
Save pykong/d9bf5d7bae29a7aba1c2309c7a24b58a to your computer and use it in GitHub Desktop.
Very simple interface for multi-threaded execution of functions in python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from queue import Queue | |
class EasyThreading(object): | |
""" | |
To Process the functions in parallel | |
""" | |
def __init__(self, | |
func, | |
data, | |
*args, # put here to enforce keyword usage | |
pool_size=None, | |
daemon=True, | |
verbose=False, | |
**kwargs | |
): | |
""" | |
:type kwargs: object | |
:type args: object | |
""" | |
self.func = func | |
self.data = data # must be iterable! | |
# TODO: iter check | |
# https://stackoverflow.com/questions/1952464/in-python-how-do-i-determine-if-an-object-is-iterable | |
# number of threads according to length of iterable, else as specified: | |
if pool_size is None: | |
pool_size = len(data) | |
self.verbose = verbose | |
self.args = args | |
self.kwargs = kwargs | |
# stop sign: | |
self.on = True | |
# setting up and filling queue: | |
self.q = Queue() | |
self.in_queue(self.data) | |
# initializing threads: | |
self.threads = [threading.Thread(target=self._threader) for _ in range(pool_size)] | |
[t.setDaemon(daemon) for t in self.threads] | |
self.print_lock = threading.Lock() | |
def _threader(self): | |
# while True: # while loop crucial here, to process all jobs in queue | |
# while not self.q.empty(): # works as long queue is not empty requires external while loop | |
# while self.on: # stop sign | |
while True: | |
# generating indexed thread name in the form of: func_name.thread.001: | |
thread_index = int(threading.current_thread().name.split('-')[-1]) | |
thread_name = self.func.__name__ + '.thread.' + '{0:03d}'.format(thread_index) | |
# gets a worker from the queue | |
in_put = self.q.get() | |
# print what thread is currently working on: | |
if self.verbose: | |
with self.print_lock: | |
print('{} is working on: {}'.format(thread_name, in_put)) | |
# run the job with the available worker in queue (thread) | |
if self.args and self.kwargs: | |
self.func(in_put, self.args, self.kwargs) | |
elif self.args: | |
self.func(in_put, self.args) | |
elif self.kwargs: | |
self.func(in_put, self.kwargs) | |
else: | |
self.func(in_put) | |
# completed with the job | |
self.q.task_done() | |
if self.verbose: | |
with self.print_lock: | |
print(('{} finished job.'.format(thread_name))) | |
# ending thread: | |
if not self.on: | |
if self.q.empty(): # only if queue empty at same time | |
break # <-- this one ends threads!!! | |
def in_queue(self, in_arg): | |
if self.on: # accepts input as long as stop sign is not shown | |
[self.q.put(a) for a in in_arg if a is not None] | |
elif not self.on: | |
raise Exception('No active threads.') | |
def start_all(self): | |
[t.start() for t in self.threads] | |
def join_all(self): | |
self.on = False | |
[t.join() for t in self.threads] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
# EXAMPLE: ------------------------------------------------ | |
def dummy_func(dum_var, arg_rec): | |
if type(arg_rec) is tuple: # important to accept various input | |
arg_rec = arg_rec[0] | |
time.sleep(1) | |
print(('dum_var: %s' % dum_var)) | |
for arg in arg_rec: | |
print(('Another arg: ', arg)) | |
# How to run: | |
if __name__ == '__main__': | |
dum_list = [i for i in range(50)] # generate some dummy data | |
# test_threads = EasyThreading(dummy_func, dum_list, 10, True, True, 'I am another arg!', 'Me too.') | |
test_threads = EasyThreading(dummy_func, dum_list, arg_rec=['I am another arg!', 'Me too.'], pool_size=10, verbose=True) | |
# starts thread execution | |
test_threads.start_all() | |
time.sleep(10) | |
# testing out adding additional data to queue | |
new_args = [y for y in range(50, 90, 2)] | |
test_threads.in_queue(new_args) | |
# wait until all the threads got executed | |
test_threads.join_all() | |
print("Everything finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment