Skip to content

Instantly share code, notes, and snippets.

@mydreambei-ai
Created March 20, 2017 09:09
Show Gist options
  • Save mydreambei-ai/9824c8ccdf4f4a9c5d1ed2e272f30feb to your computer and use it in GitHub Desktop.
Save mydreambei-ai/9824c8ccdf4f4a9c5d1ed2e272f30feb to your computer and use it in GitHub Desktop.
python threading pool
import time
import threading
import pickle
import queue
class Worker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._start = threading.Event()
self._lock = threading.Lock()
self._to_do= False
self._running = False
self._result = None
def run(self):
while 1:
self._start.wait()
if self._to_do:
with self._lock:
try:
self._result = self._target(*self._target_args, **self._target_kargs)
print(self._result)
self._to_do = True
self._start.clear()
except Exception as e:
self._running = False
self.do_error(e)
self._running = False
@property
def is_run(self):
return self._running
@property
def is_dead(self):
return not self.is_alive()
@property
def is_active(self):
return self.is_run and self.is_alive()
def getResult(self, timeout):
# TODO
return self._result
def do(self, func, *args,**kargs):
with self._lock:
if self.is_active:
return False
self._running = True
self._start.set()
self._to_do = True
self._target = func
self._target_args = args
self._target_kargs = kargs
return True
def do_error(self, err):
pass
class WorkerPoolException(Exception):
pass
class WorkerPool(object):
def __init__(self, size=5, limit=30, pending=0):
self.size = size
self.max = limit
self.workers = []
for _ in range(size):
self.workers.append(self.new_worker())
self._pending = queue.Queue(pending)
self.set_pending_worker()
def set_pending_worker(self):
self.do_pending_worker = self.new_worker(False)
self.do_pending_worker.start()
self.do_pending_worker.do(self.pending_worker)
def set_worker(self, worker):
worker.setDaemon(True)
worker.start()
def new_worker(self, set=True):
worker = Worker()
if set:
self.set_worker(worker)
return worker
def add_task(self, func, *args, **kwargs):
try:
worker = self.pick()
print(worker)
except WorkerPoolException as e:
try:
self._pending.put_nowait((pickle.dumps(func), args, kwargs))
except queue.Full as e:
raise WorkerPoolException("pending is full",2)
else:
worker.do(func, *args, **kwargs)
def pending_worker(self):
count = 1
func = args = kwargs = None
while 1:
if not func:
func, args, kwargs = self._pending.get()
try:
worker = self.pick()
except WorkerPoolException:
# no free worker
count += 1
time.sleep(0.1 * count)
else:
count = 1
worker.do(func, *args, **kwargs)
func = args = kwargs = None
def pick(self):
for worker in self.workers:
time.sleep(0.01)
if not worker.is_run:
return worker
else:
if len(self.workers) < self.max:
worker = self.new_worker()
self.workers.append(worker)
return worker
else:
raise WorkerPoolException("no free worker", 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment