Skip to content

Instantly share code, notes, and snippets.

@ziirish
Created December 4, 2018 15:05
Show Gist options
  • Save ziirish/ab022e440a31a35e8847a1f4c1a3af1d to your computer and use it in GitHub Desktop.
Save ziirish/ab022e440a31a35e8847a1f4c1a3af1d to your computer and use it in GitHub Desktop.
trio Pool primitive
class Pool:
def __init__(self, pool_size):
self._size = pool_size
self.send_channel, self.receive_channel = trio.open_memory_channel(pool_size)
@property
def size(self):
return self._size
@property
def stats(self):
return self.send_channel.statistics()
async def put(self, data):
await self.send_channel.send(data)
async def get(self):
return await self.receive_channel.receive()
def empty(self):
stats = self.stats
if self.size != 0 and stats.current_buffer_used == 0:
return True
return False
def full(self):
stats = self.stats
max_buffer = self.size
if max_buffer > 0 and max_buffer != math.inf:
return stats.current_buffer_used == max_buffer
return False
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.send_channel.aclose()
await self.receive_channel.aclose()
pool = Pool(5)
async with pool:
await pool.put('a')
await pool.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment