Last active
April 24, 2018 15:56
-
-
Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.
async compatible KeyedPool for Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
from keyed_pool import KeyedPool | |
async def produce(pool): | |
await pool.put('foo', 'bar') | |
await pool.put('hello', 'world') | |
await asyncio.sleep(4) | |
await pool.put('foo', 'baz') | |
async def consume(pool): | |
await asyncio.sleep(2) | |
fooval = await pool.get('foo') | |
helloval = await pool.get('hello') | |
await asyncio.sleep(3) | |
newfooval = await pool.get('foo') | |
print('{} {} {}'.format(fooval, helloval, newfooval)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
pool = KeyedPool(loop) | |
tasks = asyncio.gather(produce(pool), consume(pool)) | |
loop.run_until_complete(tasks) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
class KeyCollision(KeyError): | |
pass | |
class KeyedPool(): | |
""" | |
A pool from which items can be awaited by key. References to asyncio.Queue | |
are prominent in this one. | |
""" | |
def __init__(self, loop: asyncio.AbstractEventLoop): | |
""" | |
Inits. | |
""" | |
self._max_size = 1024 | |
self._loop = loop | |
self._pool = {} | |
self._getters = collections.deque() | |
self._putters = collections.deque() | |
def _do_next(self, waiters): | |
""" | |
Trigger an action for a collection of waiting Futures. | |
""" | |
while waiters: | |
waiter = waiters.popleft() | |
if not waiter.done(): | |
waiter.set_result(None) | |
break | |
def has_key(self, key: str) -> bool: | |
""" | |
Check if a key is defined for this pool. | |
""" | |
return key in self._pool.keys() | |
def has_items(self) -> int: | |
""" | |
Check if the pool has any items. | |
""" | |
return bool(self._pool.keys()) | |
def is_empty(self): | |
""" | |
Check if the pool is empty. | |
""" | |
return not self.has_items() | |
def is_full(self): | |
""" | |
Is the pool full? | |
""" | |
return self._max_size <= len(self._pool) | |
async def put(self, key: str, item): | |
""" | |
Put a new item into the pool. | |
""" | |
while self.is_full(): | |
putter = self._loop.create_future() | |
self._putters.append(putter) | |
try: | |
await putter | |
except: | |
putter.cancel() # Just in case putter is not done yet. | |
try: | |
# Clean self._putters from canceled putters. | |
self._putters.remove(putter) | |
except ValueError: | |
# The putter could be removed from self._putters by a | |
# previous get_nowait call. | |
pass | |
if not self.is_full() and not putter.cancelled(): | |
# We were woken up by get_nowait(), but can't take | |
# the call. Wake up the next in line. | |
self._do_next(self._putters) | |
raise | |
if self.has_key(key): | |
raise KeyCollision() | |
self._put(key, item) | |
def _put(self, key: str, item): | |
""" | |
Actually put. | |
""" | |
self._pool[key] = item | |
self._do_next(self._getters) | |
async def get(self, key: str): | |
""" | |
Get an item from the pool by key. | |
""" | |
while self.is_empty() or not self.has_key(key): | |
getter = self._loop.create_future() | |
self._getters.append(getter) | |
try: | |
await getter | |
except: | |
getter.cancel() # Just in case getter is not done yet. | |
try: | |
# Clean self._getters from canceled getters. | |
self._getters.remove(getter) | |
except ValueError: | |
# The getter could be removed from self._getters by a | |
# previous put_nowait call. | |
pass | |
if not self.is_empty() and not getter.cancelled(): | |
# We were woken up by put_nowait(), but can't take | |
# the call. Wake up the next in line. | |
self._do_next(self._getters) | |
raise | |
return self._get(key) | |
def _get(self, key: str): | |
""" | |
Actually get. | |
""" | |
p = self._pool | |
item = p.get(key) | |
del p[key] | |
self._pool = p | |
self._do_next(self._putters) | |
return item |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment