Created
March 20, 2017 09:09
-
-
Save mydreambei-ai/9824c8ccdf4f4a9c5d1ed2e272f30feb to your computer and use it in GitHub Desktop.
python threading pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import threading | |
import pickle | |
import queue | |
class Worker(threading.Thread): | |
def __init__(self): | |
threading.Thread.__init__(self) | |
self._start = threading.Event() | |
self._lock = threading.Lock() | |
self._to_do= False | |
self._running = False | |
self._result = None | |
def run(self): | |
while 1: | |
self._start.wait() | |
if self._to_do: | |
with self._lock: | |
try: | |
self._result = self._target(*self._target_args, **self._target_kargs) | |
print(self._result) | |
self._to_do = True | |
self._start.clear() | |
except Exception as e: | |
self._running = False | |
self.do_error(e) | |
self._running = False | |
@property | |
def is_run(self): | |
return self._running | |
@property | |
def is_dead(self): | |
return not self.is_alive() | |
@property | |
def is_active(self): | |
return self.is_run and self.is_alive() | |
def getResult(self, timeout): | |
# TODO | |
return self._result | |
def do(self, func, *args,**kargs): | |
with self._lock: | |
if self.is_active: | |
return False | |
self._running = True | |
self._start.set() | |
self._to_do = True | |
self._target = func | |
self._target_args = args | |
self._target_kargs = kargs | |
return True | |
def do_error(self, err): | |
pass | |
class WorkerPoolException(Exception): | |
pass | |
class WorkerPool(object): | |
def __init__(self, size=5, limit=30, pending=0): | |
self.size = size | |
self.max = limit | |
self.workers = [] | |
for _ in range(size): | |
self.workers.append(self.new_worker()) | |
self._pending = queue.Queue(pending) | |
self.set_pending_worker() | |
def set_pending_worker(self): | |
self.do_pending_worker = self.new_worker(False) | |
self.do_pending_worker.start() | |
self.do_pending_worker.do(self.pending_worker) | |
def set_worker(self, worker): | |
worker.setDaemon(True) | |
worker.start() | |
def new_worker(self, set=True): | |
worker = Worker() | |
if set: | |
self.set_worker(worker) | |
return worker | |
def add_task(self, func, *args, **kwargs): | |
try: | |
worker = self.pick() | |
print(worker) | |
except WorkerPoolException as e: | |
try: | |
self._pending.put_nowait((pickle.dumps(func), args, kwargs)) | |
except queue.Full as e: | |
raise WorkerPoolException("pending is full",2) | |
else: | |
worker.do(func, *args, **kwargs) | |
def pending_worker(self): | |
count = 1 | |
func = args = kwargs = None | |
while 1: | |
if not func: | |
func, args, kwargs = self._pending.get() | |
try: | |
worker = self.pick() | |
except WorkerPoolException: | |
# no free worker | |
count += 1 | |
time.sleep(0.1 * count) | |
else: | |
count = 1 | |
worker.do(func, *args, **kwargs) | |
func = args = kwargs = None | |
def pick(self): | |
for worker in self.workers: | |
time.sleep(0.01) | |
if not worker.is_run: | |
return worker | |
else: | |
if len(self.workers) < self.max: | |
worker = self.new_worker() | |
self.workers.append(worker) | |
return worker | |
else: | |
raise WorkerPoolException("no free worker", 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment