Skip to content

Instantly share code, notes, and snippets.

@jmfrank63
Created August 6, 2018 15:05
Show Gist options
  • Save jmfrank63/5fb9909a8e06c91dead9265cab2f33de to your computer and use it in GitHub Desktop.
Save jmfrank63/5fb9909a8e06c91dead9265cab2f33de to your computer and use it in GitHub Desktop.
Async deque in python
# -*- coding: utf-8 -*-
class AsyncDeque(deque):
def __init__(self, elements):
super().__init__(elements)
def __aiter__(self):
return self
async def __anext__(self):
if not self:
raise StopAsyncIteration
element = self.popleft()
return element
@agronick
Copy link

class AsyncDeque(collections.deque):

  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

  def __aiter__(self):
    return self

  async def __anext__(self, max_wait=10):
    total = 0
    while not self:
      await asyncio.sleep(.1)
      total += .1
      if total > max_wait:
        raise StopIteration
    element = self.popleft()
    return element

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment