Last active
August 30, 2016 02:41
-
-
Save mumbleskates/534e42514ead3cf2357f1c2d7ac12cf0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
from mumblecode.multithreading import CloseableQueue | |
from queue import Queue, Empty | |
from threading import Thread | |
import requests | |
_STOP = object() | |
class RequestPool(object): | |
def __init__(self, pool_size=20, queue_size=64, rate_limiter=lambda: None): | |
self.pool_size = pool_size | |
self.limiter = rate_limiter | |
self.running = 0 | |
self.request_queue = CloseableQueue(maxsize=queue_size) | |
self.result_queue = Queue(maxsize=queue_size) | |
self.threadpool = () | |
def _worker(self): | |
"""pool worker""" | |
while True: | |
try: | |
request = self.request_queue.get() | |
except StopIteration: # input queue has closed | |
# signal thread is shutting down | |
self.result_queue.put(_STOP) | |
return # shutdown thread | |
try: | |
self.limiter() | |
result = requests.get(request) | |
self.result_queue.put(result) | |
except Exception as ex: | |
self.result_queue.put((ex, request)) | |
def start_pool(self): | |
"""Start the pool""" | |
if self.threadpool is not (): | |
raise ValueError("threadpool was already started") | |
threadpool = [ | |
Thread(target=self._worker, daemon=True) | |
for _ in range(self.pool_size) | |
] | |
for thread in threadpool: | |
thread.start() | |
self.running += 1 | |
def stop_pool(self): | |
"""Stop the pool and prevent any new requests from being submitted""" | |
self.request_queue.close() | |
def put_request(self, request): | |
"""Put a single request into the queue""" | |
if not self.running: | |
raise RuntimeError("threadpool is stopped") | |
self.request_queue.put(request) | |
def put_all(self, request_iterable): | |
""" | |
Put every request in an iterable into the queue by spawning | |
a new thread to insert them | |
""" | |
if not self.running: | |
raise RuntimeError("threadpool is stopped") | |
def putter(): | |
# add an imaginary task to the request queue to signal that we | |
# are constantly trying to stuff more items in. This prevents | |
# is_idle() from encountering a race condition and returning | |
# a false positive when the putter thread has starved. | |
with self.request_queue.all_tasks_done: | |
self.request_queue.unfinished_tasks += 1 | |
try: | |
for request in request_iterable: | |
self.request_queue.put(request) | |
except ValueError: # input queue is closed | |
return | |
finally: | |
# we are done stuffing items into the queue, finish that | |
# imaginary task | |
self.request_queue.task_done() | |
Thread(target=putter, daemon=True).start() | |
def get_result(self, wait=True): | |
""" | |
Return a result from the request pool or a tuple containing a | |
resulting error and the original request that caused it. | |
Raises StopIteration if the threadpool is not running. | |
If Wait is set to False, returns immediately or raises queue.Empty | |
to signify no results are ready. | |
""" | |
while self.running: | |
try: | |
result = self.result_queue.get(wait) | |
self.request_queue.task_done() | |
except Empty: | |
raise | |
if result is _STOP: # a worker thread has stopped | |
self.running -= 1 | |
else: | |
return result | |
else: | |
raise StopIteration("threadpool is stopped") | |
def is_idle(self): | |
""" | |
Return True iff there are no in-flight requests. | |
False may be returned as the last request is just finishing. If the pool | |
is in use by multiple threads, True may not be accurate if another thread | |
just submitted a request. If the pool is only used by a single thread that | |
is no longer submitting requests, or the pool has been stopped, True will | |
always indicate that the pool is truly idle. | |
""" | |
return not self.request_queue.unfinished_tasks |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment