Created
June 4, 2009 12:30
-
-
Save andreisavu/123591 to your computer and use it in GitHub Desktop.
Python Thread Pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Original code at http://code.activestate.com/recipes/203871/ | |
# I have added a limit for the number of tasks waiting | |
# The method queueTask will block if the current taskList size | |
# exceeds the specified limit | |
import threading | |
from time import sleep | |
# Ensure booleans exist (not needed for Python 2.2.1 or higher) | |
try: | |
True | |
except NameError: | |
False = 0 | |
True = not False | |
class ThreadPool: | |
"""Flexible thread pool class. Creates a pool of threads, then | |
accepts tasks that will be dispatched to the next available | |
thread.""" | |
def __init__(self, numThreads, numTasks=0): | |
"""Initialize the thread pool with numThreads workers. Limit | |
the maximum number of taks to numTasks or unlimited if 0""" | |
self.__threads = [] | |
self.__resizeLock = threading.Condition(threading.Lock()) | |
self.__taskLock = threading.Condition(threading.Lock()) | |
self.__taskPop = threading.Event() | |
self.__tasks = [] | |
self.__isJoining = False | |
self.__numTasks = numTasks | |
self.setThreadCount(numThreads) | |
def setThreadCount(self, newNumThreads): | |
""" External method to set the current pool size. Acquires | |
the resizing lock, then calls the internal version to do real | |
work.""" | |
# Can't change the thread count if we're shutting down the pool! | |
if self.__isJoining: | |
return False | |
self.__resizeLock.acquire() | |
try: | |
self.__setThreadCountNolock(newNumThreads) | |
finally: | |
self.__resizeLock.release() | |
return True | |
def __setThreadCountNolock(self, newNumThreads): | |
"""Set the current pool size, spawning or terminating threads | |
if necessary. Internal use only; assumes the resizing lock is | |
held.""" | |
# If we need to grow the pool, do so | |
while newNumThreads > len(self.__threads): | |
newThread = ThreadPoolThread(self) | |
self.__threads.append(newThread) | |
newThread.start() | |
# If we need to shrink the pool, do so | |
while newNumThreads < len(self.__threads): | |
self.__threads[0].goAway() | |
del self.__threads[0] | |
def getThreadCount(self): | |
"""Return the number of threads in the pool.""" | |
self.__resizeLock.acquire() | |
try: | |
return len(self.__threads) | |
finally: | |
self.__resizeLock.release() | |
def queueTask(self, task, args=None, taskCallback=None): | |
"""Insert a task into the queue. task must be callable; | |
args and taskCallback can be None.""" | |
if self.__isJoining == True: | |
return False | |
if not callable(task): | |
return False | |
self.__taskLock.acquire() | |
try: | |
while len(self.__tasks) > self.__numTasks > 0: | |
self.__taskLock.release() | |
self.__taskPop.wait() | |
self.__taskLock.acquire() | |
self.__tasks.append((task, args, taskCallback)) | |
return True | |
finally: | |
self.__taskLock.release() | |
def getNextTask(self): | |
""" Retrieve the next task from the task queue. For use | |
only by ThreadPoolThread objects contained in the pool.""" | |
self.__taskLock.acquire() | |
try: | |
if self.__tasks == []: | |
return (None, None, None) | |
else: | |
return self.__tasks.pop(0) | |
finally: | |
self.__taskPop.set() | |
self.__taskPop.clear() | |
self.__taskLock.release() | |
def joinAll(self, waitForTasks = True, waitForThreads = True): | |
""" Clear the task queue and terminate all pooled threads, | |
optionally allowing the tasks and threads to finish.""" | |
# Mark the pool as joining to prevent any more task queueing | |
self.__isJoining = True | |
# Wait for tasks to finish | |
if waitForTasks: | |
while self.__tasks != []: | |
sleep(.1) | |
# Tell all the threads to quit | |
self.__resizeLock.acquire() | |
try: | |
self.__setThreadCountNolock(0) | |
self.__isJoining = True | |
# Wait until all threads have exited | |
if waitForThreads: | |
for t in self.__threads: | |
t.join() | |
del t | |
# Reset the pool for potential reuse | |
self.__isJoining = False | |
finally: | |
self.__resizeLock.release() | |
class ThreadPoolThread(threading.Thread): | |
""" Pooled thread class. """ | |
threadSleepTime = 0.1 | |
def __init__(self, pool): | |
""" Initialize the thread and remember the pool. """ | |
threading.Thread.__init__(self) | |
self.__pool = pool | |
self.__isDying = False | |
def run(self): | |
""" Until told to quit, retrieve the next task and execute | |
it, calling the callback if any. """ | |
while self.__isDying == False: | |
cmd, args, callback = self.__pool.getNextTask() | |
# If there's nothing to do, just sleep a bit | |
if cmd is None: | |
sleep(ThreadPoolThread.threadSleepTime) | |
elif callback is None: | |
cmd(args) | |
else: | |
callback(cmd(args)) | |
def goAway(self): | |
""" Exit the run loop next time through.""" | |
self.__isDying = True | |
# Usage example | |
if __name__ == "__main__": | |
from random import randrange | |
# Sample task 1: given a start and end value, shuffle integers, | |
# then sort them | |
def sortTask(data): | |
print "SortTask starting for ", data | |
numbers = range(data[0], data[1]) | |
for a in numbers: | |
rnd = randrange(0, len(numbers) - 1) | |
a, numbers[rnd] = numbers[rnd], a | |
print "SortTask sorting for ", data | |
numbers.sort() | |
print "SortTask done for ", data | |
return "Sorter ", data | |
# Sample task 2: just sleep for a number of seconds. | |
def waitTask(data): | |
print "WaitTask starting for ", data | |
print "WaitTask sleeping for %d seconds" % data | |
sleep(data) | |
return "Waiter", data | |
# Both tasks use the same callback | |
def taskCallback(data): | |
print "Callback called for", data | |
# Create a pool with three worker threads | |
pool = ThreadPool(3) | |
# Insert tasks into the queue and let them run | |
pool.queueTask(sortTask, (1000, 100000), taskCallback) | |
pool.queueTask(waitTask, 5, taskCallback) | |
pool.queueTask(sortTask, (200, 200000), taskCallback) | |
pool.queueTask(waitTask, 2, taskCallback) | |
pool.queueTask(sortTask, (3, 30000), taskCallback) | |
pool.queueTask(waitTask, 7, taskCallback) | |
# When all tasks are finished, allow the threads to terminate | |
pool.joinAll() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment