Created
November 16, 2022 20:42
-
-
Save earonesty/c51e1f7a39a4227a49b7b966d9286004 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FutureWaitQueue: | |
"""Wait for futures without making an infinite list. | |
Any exceptions are ignored at put() time, except for timeout errors, which are raised. | |
The last exception raised is raised at wait() time | |
""" | |
def __init__(self, maxsize, timeout, err_on_timeout=concurrent.futures.TimeoutError): | |
"""Construct a future wait queue. | |
:param maxsize: maximum number of entries before waiting is forced | |
:param timeout: if a single entry takes this long, raise an exception | |
:param err_on_timeout: exception to raise on timeout | |
""" | |
self.maxsize = maxsize | |
self.timeout = timeout | |
self.last_ex = None | |
self.err_on_timeout = err_on_timeout | |
self.q: Set[Future] = set() | |
def __wait_one(self): | |
done, not_done = concurrent.futures.wait( | |
self.q, return_when=concurrent.futures.FIRST_COMPLETED, timeout=self.timeout | |
) | |
self.q = not_done | |
if not done: | |
raise self.err_on_timeout | |
for ent in done: | |
try: | |
ent.result() | |
except Exception as ex: | |
self.last_ex = ex | |
def put(self, fut: Optional[Future]): | |
"""Add a future to the wait queue, if the queue is full, wait for one to complete first.""" | |
if fut: | |
if len(self.q) >= self.maxsize: | |
self.__wait_one() | |
self.q.add(fut) | |
def wait(self): | |
"""Wait for everything in the queue.""" | |
for ent in self.q: | |
try: | |
ent.result(timeout=self.timeout) | |
except Exception as ex: | |
if isinstance(ex, concurrent.futures.TimeoutError): | |
raise self.err_on_timeout | |
self.last_ex = ex | |
if self.last_ex: | |
raise self.last_ex |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment