Skip to content

Instantly share code, notes, and snippets.

@OzTamir
Last active March 28, 2016 07:55
Show Gist options
  • Select an option

  • Save OzTamir/db9e60e196b6366dc79f to your computer and use it in GitHub Desktop.

Select an option

Save OzTamir/db9e60e196b6366dc79f to your computer and use it in GitHub Desktop.
It's time to chimichanga! make the
####################################
#
# threadpool.py - A utility class to manage a task queue with threads
# Author: Oz Tamir
#
####################################
import random
import logging
from Queue import Queue
from threading import Thread, Lock
class Threadpool(object):
'''
A utility class to manage a task queue with threads
'''
def __init__(self, thread_count=2):
self.thread_count = thread_count
self.__queue__ = Queue()
self.__results__ = dict()
self.__results_lock = Lock()
self.__threads__ = []
# A value used to indicate if a task is not done
self.NOT_DONE = random.randint(0, 2 ** 16)
# A value used to indicate if a task had failed
self.FAILED = random.randint(0, 2 ** 16)
# Add the initial number of threads
for thread_idx in xrange(thread_count):
self.add_thread()
def __worker_function(self):
'''
The functions each worker thread will run
'''
while True:
# Get the next task to perform
task_id, task_func, args = self.__queue__.get()
try:
# Run the function with the arguments provided, get the result
if args:
task_result = task_func(*args)
else:
task_result = task_func()
except Exception, e:
# If there was an exception, store it in the result instead of
# raising it
task_result = self.FAILED
# Log an exception
error_msg = 'Task %s failed: %s' % (task_id, repr(e))
logging.getLogger('Threadpool').error(error_msg)
# Wait until we can write result to the results dictionary
self.__results_lock.acquire()
try:
self.__results__[task_id] = task_result
finally:
# Release the lock
self.__results_lock.release()
# Inform the queue that this task is done
self.__queue__.task_done()
def add_thread(self):
'''
Add a new thread to the pool
'''
# Build a name for the new thread
thread_name = 'T.%d' % (len(self.__threads__) + 1)
# Create the new thread
new_thread = Thread(name=thread_name, target=self.__worker_function)
new_thread.setDaemon(True)
self.__threads__.append(new_thread)
new_thread.start()
self.thread_count += 1
def add_task(self, task_name, task_func, *args):
'''
Add a new task to the queue
'''
# Create the task tuple (this is mainly for readability reasons)
task = (task_name, task_func, args)
# Wait until we can add the task entry to the result dict
self.__results_lock.acquire()
try:
self.__results__[task_name] = self.NOT_DONE
finally:
# Release the lock
self.__results_lock.release()
# Add it to the queue
self.__queue__.put(task)
def get_result(self, task_name):
'''
Get the result of a task
'''
# Acquire the lock to access the dict
self.__results_lock.acquire()
try:
result = self.__results__.get(task_name, None)
finally:
self.__results_lock.release()
return result
def task_finished(self, task_name, failed_is_finished=False):
'''
Check if a task has finished
If failed_is_finished is set, FAILED flag will count as finished
'''
is_done = self.get_result(task_name) != self.NOT_DONE
fail = self.get_result(task_name) != self.FAILED or failed_is_finished
return is_done and fail
def task_failed(self, task_name):
'''
Check if a task has failed
'''
return self.get_result(task_name) == self.FAILED
def any_failed(self):
'''
Check if any of the tasks had failed
'''
result = True
self.__results_lock.acquire()
try:
result = any(status == self.FAILED
for status in self.__results__.values())
finally:
self.__results_lock.release()
return result
def clear(self):
'''
Clear the task queue, return all the tasks
'''
tasks = []
self.__results_lock.acquire()
try:
self.__results__ = dict()
while not self.__queue__.empty():
tasks.append(self.__queue__.get())
finally:
self.__results_lock.release()
return tasks
def join(self):
'''
Block until all the items in the queue are done
'''
self.__queue__.join()
def is_empty(self):
'''
Check if the queue is empty
'''
return self.__queue__.empty()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment