Last active
March 28, 2016 07:55
-
-
Save OzTamir/db9e60e196b6366dc79f to your computer and use it in GitHub Desktop.
It's time to chimichanga! make the
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #################################### | |
| # | |
| # threadpool.py - A utility class to manage a task queue with threads | |
| # Author: Oz Tamir | |
| # | |
| #################################### | |
| import random | |
| import logging | |
| from Queue import Queue | |
| from threading import Thread, Lock | |
| class Threadpool(object): | |
| ''' | |
| A utility class to manage a task queue with threads | |
| ''' | |
| def __init__(self, thread_count=2): | |
| self.thread_count = thread_count | |
| self.__queue__ = Queue() | |
| self.__results__ = dict() | |
| self.__results_lock = Lock() | |
| self.__threads__ = [] | |
| # A value used to indicate if a task is not done | |
| self.NOT_DONE = random.randint(0, 2 ** 16) | |
| # A value used to indicate if a task had failed | |
| self.FAILED = random.randint(0, 2 ** 16) | |
| # Add the initial number of threads | |
| for thread_idx in xrange(thread_count): | |
| self.add_thread() | |
| def __worker_function(self): | |
| ''' | |
| The functions each worker thread will run | |
| ''' | |
| while True: | |
| # Get the next task to perform | |
| task_id, task_func, args = self.__queue__.get() | |
| try: | |
| # Run the function with the arguments provided, get the result | |
| if args: | |
| task_result = task_func(*args) | |
| else: | |
| task_result = task_func() | |
| except Exception, e: | |
| # If there was an exception, store it in the result instead of | |
| # raising it | |
| task_result = self.FAILED | |
| # Log an exception | |
| error_msg = 'Task %s failed: %s' % (task_id, repr(e)) | |
| logging.getLogger('Threadpool').error(error_msg) | |
| # Wait until we can write result to the results dictionary | |
| self.__results_lock.acquire() | |
| try: | |
| self.__results__[task_id] = task_result | |
| finally: | |
| # Release the lock | |
| self.__results_lock.release() | |
| # Inform the queue that this task is done | |
| self.__queue__.task_done() | |
| def add_thread(self): | |
| ''' | |
| Add a new thread to the pool | |
| ''' | |
| # Build a name for the new thread | |
| thread_name = 'T.%d' % (len(self.__threads__) + 1) | |
| # Create the new thread | |
| new_thread = Thread(name=thread_name, target=self.__worker_function) | |
| new_thread.setDaemon(True) | |
| self.__threads__.append(new_thread) | |
| new_thread.start() | |
| self.thread_count += 1 | |
| def add_task(self, task_name, task_func, *args): | |
| ''' | |
| Add a new task to the queue | |
| ''' | |
| # Create the task tuple (this is mainly for readability reasons) | |
| task = (task_name, task_func, args) | |
| # Wait until we can add the task entry to the result dict | |
| self.__results_lock.acquire() | |
| try: | |
| self.__results__[task_name] = self.NOT_DONE | |
| finally: | |
| # Release the lock | |
| self.__results_lock.release() | |
| # Add it to the queue | |
| self.__queue__.put(task) | |
| def get_result(self, task_name): | |
| ''' | |
| Get the result of a task | |
| ''' | |
| # Acquire the lock to access the dict | |
| self.__results_lock.acquire() | |
| try: | |
| result = self.__results__.get(task_name, None) | |
| finally: | |
| self.__results_lock.release() | |
| return result | |
| def task_finished(self, task_name, failed_is_finished=False): | |
| ''' | |
| Check if a task has finished | |
| If failed_is_finished is set, FAILED flag will count as finished | |
| ''' | |
| is_done = self.get_result(task_name) != self.NOT_DONE | |
| fail = self.get_result(task_name) != self.FAILED or failed_is_finished | |
| return is_done and fail | |
| def task_failed(self, task_name): | |
| ''' | |
| Check if a task has failed | |
| ''' | |
| return self.get_result(task_name) == self.FAILED | |
| def any_failed(self): | |
| ''' | |
| Check if any of the tasks had failed | |
| ''' | |
| result = True | |
| self.__results_lock.acquire() | |
| try: | |
| result = any(status == self.FAILED | |
| for status in self.__results__.values()) | |
| finally: | |
| self.__results_lock.release() | |
| return result | |
| def clear(self): | |
| ''' | |
| Clear the task queue, return all the tasks | |
| ''' | |
| tasks = [] | |
| self.__results_lock.acquire() | |
| try: | |
| self.__results__ = dict() | |
| while not self.__queue__.empty(): | |
| tasks.append(self.__queue__.get()) | |
| finally: | |
| self.__results_lock.release() | |
| return tasks | |
| def join(self): | |
| ''' | |
| Block until all the items in the queue are done | |
| ''' | |
| self.__queue__.join() | |
| def is_empty(self): | |
| ''' | |
| Check if the queue is empty | |
| ''' | |
| return self.__queue__.empty() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment