Last active
September 8, 2016 03:58
-
-
Save nhumrich/dc9df57a24e9a75860a5b4853a4b7dc9 to your computer and use it in GitHub Desktop.
python asyncio drain feature patch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -r c2212d98ef13 Lib/asyncio/queues.py | |
--- a/Lib/asyncio/queues.py Wed Sep 07 14:56:15 2016 -0700 | |
+++ b/Lib/asyncio/queues.py Wed Sep 07 21:56:33 2016 -0600 | |
@@ -1,13 +1,13 @@ | |
"""Queues""" | |
-__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty'] | |
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', | |
+ 'QueueFull', 'QueueEmpty', 'QueueClosed'] | |
import collections | |
import heapq | |
from . import compat | |
from . import events | |
-from . import futures | |
from . import locks | |
from .coroutines import coroutine | |
@@ -26,6 +26,11 @@ | |
pass | |
+class QueueClosed(Exception): | |
+ """Exception raised by Queue.get() and Queue.put() | |
+ when Queue.close() or Queue.drain() is called""" | |
+ | |
+ | |
class Queue: | |
"""A queue, useful for coordinating producer and consumer coroutines. | |
@@ -47,12 +52,15 @@ | |
# Futures. | |
self._getters = collections.deque() | |
- # Futures. | |
self._putters = collections.deque() | |
+ self._drainer = None | |
+ | |
self._unfinished_tasks = 0 | |
self._finished = locks.Event(loop=self._loop) | |
self._finished.set() | |
self._init(maxsize) | |
+ self.is_closed = False | |
+ self.is_draining = False | |
# These three are overridable in subclasses. | |
@@ -75,6 +83,12 @@ | |
waiter.set_result(None) | |
break | |
+ def _wakeup_all(self): | |
+ # wake up all waiters (if any) that aren't cancelled. | |
+ for waiters in (self._putters, self._getters): | |
+ while waiters: | |
+ self._wakeup_next(waiters) | |
+ | |
def __repr__(self): | |
return '<{} at {:#x} {}>'.format( | |
type(self).__name__, id(self), self._format()) | |
@@ -128,6 +142,8 @@ | |
This method is a coroutine. | |
""" | |
while self.full(): | |
+ if self.is_closed or self.is_draining: | |
+ raise QueueClosed | |
putter = self._loop.create_future() | |
self._putters.append(putter) | |
try: | |
@@ -146,6 +162,8 @@ | |
If no free slot is immediately available, raise QueueFull. | |
""" | |
+ if self.is_draining or self.is_closed: | |
+ raise QueueClosed | |
if self.full(): | |
raise QueueFull | |
self._put(item) | |
@@ -162,6 +180,16 @@ | |
This method is a coroutine. | |
""" | |
while self.empty(): | |
+ if self.is_closed: | |
+ raise QueueClosed | |
+ if self.is_draining: | |
+ if self._drainer: | |
+ self._drainer.set_result(None) | |
+ self._drainer = None | |
+ self.is_draining = False | |
+ self.is_closed = True | |
+ raise QueueClosed | |
+ | |
getter = self._loop.create_future() | |
self._getters.append(getter) | |
try: | |
@@ -218,6 +246,45 @@ | |
if self._unfinished_tasks > 0: | |
yield from self._finished.wait() | |
+ @coroutine | |
+ def drain(self): | |
+ """ Closes the queue and lets the queue drain. | |
+ Waits until queue is empty before returning. | |
+ | |
+ Any following calls to Queue.put() or Queue.put_nowait() will raise | |
+ a QueueClosed Exception. Following calls to Queue.get() will succeed | |
+ until the queue is empty. Once the queue is empty, Queue.get() will | |
+ raise a QueueClosed exception. | |
+ | |
+ Raises QueueClosed if the queue is already being drained or is closed. | |
+ """ | |
+ if self.is_draining: | |
+ raise QueueClosed | |
+ self.drain_nowait() | |
+ yield from self.join() | |
+ | |
+ def drain_nowait(self): | |
+ """Closes the queue and lets the queue drain. | |
+ Does not wait for the queue to be drained before returning. | |
+ """ | |
+ if self.empty(): | |
+ self.is_draining = False | |
+ self.is_closed = True | |
+ else: | |
+ self.is_draining = True | |
+ | |
+ self._wakeup_all() | |
+ | |
+ def close(self): | |
+ """ Closes the queue immediately, preventing all puts or gets. | |
+ | |
+ Any call to Queue.get(), Queue.put(), or Queue.put_nowait() will | |
+ raise a QueueClosed exception. | |
+ """ | |
+ self.is_closed = True | |
+ self.is_draining = False | |
+ self._wakeup_all() | |
+ | |
class PriorityQueue(Queue): | |
"""A subclass of Queue; retrieves entries in priority order (lowest first). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment