Created
December 3, 2015 20:37
-
-
Save Perlence/c9b8b0f557e3af818d65 to your computer and use it in GitHub Desktop.
A subclass of asyncio.Queue with close() method
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """closingqueue.py | |
| Inspired by `pychan <https://github.com/stuglaser/pychan>`_. | |
| """ | |
| import asyncio | |
| class ClosingQueue(asyncio.Queue): | |
| """A subclass of Queue with close() method. | |
| If queue is closed the following will happen: | |
| - calling put() and put_nowait() will immediately raise QueueClosed. | |
| - calling get() and get_nowait() will pop items from the queue if | |
| not empty, otherwise will raise QueueClosed. | |
| """ | |
| def __init__(self, maxsize=0, *, loop=None): | |
| super().__init__(maxsize, loop=loop) | |
| self._closed = False | |
| def close(self): | |
| if self._closed: | |
| raise RuntimeError('closing already closed queue') | |
| while self._putters: | |
| _, putter = self._putters.popleft() | |
| if not putter.done(): | |
| putter.set_exception(QueueClosed(which=self)) | |
| while self._getters: | |
| getter = self._getters.popleft() | |
| if not getter.done(): | |
| getter.set_exception(QueueClosed(which=self)) | |
| self._closed = True | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *exc_info): | |
| self.close() | |
| @asyncio.coroutine | |
| def put(self, item): | |
| if self._closed: | |
| raise QueueClosed(which=self) | |
| yield from super().put(item) | |
| def put_nowait(self, item): | |
| if self._closed: | |
| raise QueueClosed(which=self) | |
| super().put_nowait(item) | |
| @asyncio.coroutine | |
| def get(self): | |
| if not self.qsize() and self._closed: | |
| raise QueueClosed(which=self) | |
| return (yield from super().get()) | |
| def get_nowait(self): | |
| if not self.qsize() and self._closed: | |
| raise QueueClosed(which=self) | |
| return super().get_nowait() | |
| class QueueClosed(Exception): | |
| def __init__(self, *args, **kwargs): | |
| self.which = kwargs.pop('which') | |
| super().__init__(*args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment