Skip to content

Instantly share code, notes, and snippets.

@Perlence
Created December 3, 2015 20:37
Show Gist options
  • Select an option

  • Save Perlence/c9b8b0f557e3af818d65 to your computer and use it in GitHub Desktop.

Select an option

Save Perlence/c9b8b0f557e3af818d65 to your computer and use it in GitHub Desktop.
A subclass of asyncio.Queue with close() method
"""closingqueue.py
Inspired by `pychan <https://github.com/stuglaser/pychan>`_.
"""
import asyncio
class ClosingQueue(asyncio.Queue):
"""A subclass of Queue with close() method.
If queue is closed the following will happen:
- calling put() and put_nowait() will immediately raise QueueClosed.
- calling get() and get_nowait() will pop items from the queue if
not empty, otherwise will raise QueueClosed.
"""
def __init__(self, maxsize=0, *, loop=None):
super().__init__(maxsize, loop=loop)
self._closed = False
def close(self):
if self._closed:
raise RuntimeError('closing already closed queue')
while self._putters:
_, putter = self._putters.popleft()
if not putter.done():
putter.set_exception(QueueClosed(which=self))
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_exception(QueueClosed(which=self))
self._closed = True
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
@asyncio.coroutine
def put(self, item):
if self._closed:
raise QueueClosed(which=self)
yield from super().put(item)
def put_nowait(self, item):
if self._closed:
raise QueueClosed(which=self)
super().put_nowait(item)
@asyncio.coroutine
def get(self):
if not self.qsize() and self._closed:
raise QueueClosed(which=self)
return (yield from super().get())
def get_nowait(self):
if not self.qsize() and self._closed:
raise QueueClosed(which=self)
return super().get_nowait()
class QueueClosed(Exception):
def __init__(self, *args, **kwargs):
self.which = kwargs.pop('which')
super().__init__(*args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment