Created
January 8, 2015 03:15
-
-
Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.
Simple Python MPMC queue and thread pool.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
import sys | |
import threading | |
import collections | |
class WorkerPool(object): | |
"""A thread pool that processes items from an unbounded work queue, for | |
simple multi-producer, multi-consumer processes. | |
The 'workers' argument controls the number of worker threads. The function | |
'func' is called on a worker thread and is passed a work item (as well as | |
any other parameters passed to the initializer). | |
The 'excepthook' property, which defaults to sys.excepthook, is called when | |
the worker function raises an exception. The worker thread itself is not | |
terminated. | |
Note that stop() must be called to terminate gracefully. | |
""" | |
def __init__(self, workers, func, *args, **kwargs): | |
self.excepthook = sys.excepthook | |
self.__items = collections.deque() | |
self.__mutex = threading.Lock() | |
self.__semaphore = threading.Semaphore(value=0) | |
self.__func = func | |
self.__args = args | |
self.__kwargs = kwargs | |
self.__stopped = False | |
self.__canceled = False | |
threads = [] | |
for _ in xrange(workers): | |
thread = threading.Thread(target=self.__worker) | |
thread.start() | |
threads.append(thread) | |
self.__threads = threads | |
def kick(self, item): | |
"Puts an item into the work queue." | |
with self.__mutex: | |
if self.__stopped: | |
raise RuntimeError("Cannot kick new work items after stop() has been called") | |
self.__items.appendleft(item) | |
# Increase the 'pending work' counter. | |
self.__semaphore.release() | |
def stop(self, cancel=False, wait=False): | |
"""Stops the worker pool from processing any additional items and allows | |
the worker threads to terminate. | |
If 'cancel' is true, then the worker threads will terminate as soon as | |
possible, without processing every work item. | |
If 'wait' is true, then stop() will block until every work item has been | |
processed and every worker thread has terminated. | |
""" | |
with self.__mutex: | |
self.__stopped = True | |
if cancel: | |
self.__canceled = True | |
# Wake up all sleeping worker threads. | |
for thread in self.__threads: | |
self.__semaphore.release() | |
if wait: | |
for thread in self.__threads: | |
thread.join() | |
def canceled(self): | |
"True if the pool has been asked to cancel processing." | |
return self.__canceled | |
def __worker(self): | |
"Worker thread main loop." | |
while not self.__canceled: | |
# Decrease the 'pending work' counter. | |
self.__semaphore.acquire() | |
if self.__canceled: | |
break | |
with self.__mutex: | |
# Get a single item from the work queue. If there are no items | |
# left in the queue, it must be time to stop. | |
if self.__items: | |
item = self.__items.pop() | |
else: | |
assert self.__stopped | |
break | |
try: | |
self.__func(item, *self.__args, **self.__kwargs) | |
except StandardError: | |
self.excepthook(*sys.exc_info()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment