Skip to content

Instantly share code, notes, and snippets.

@parity3
Last active December 31, 2019 01:39
Show Gist options
  • Save parity3/3c26e32fd3f475597d6e8b16587a2a9f to your computer and use it in GitHub Desktop.
Save parity3/3c26e32fd3f475597d6e8b16587a2a9f to your computer and use it in GitHub Desktop.
publish-subscribe with sequential async delivery
from contextlib import asynccontextmanager
import trio
class broadcaster:
def __init__(self):
super().__init__()
self.subscribers = {}
async def __aenter__(self):
return self
async def aclose(self):
subs = list(self.subscribers.values())
for chan in subs:
try:
await chan.aclose()
except trio.ClosedResourceError:
pass
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
@asynccontextmanager
async def subscribe(self):
snd_chan, rcv_chan = trio.open_memory_channel(0)
key = object()
async with snd_chan:
self.subscribers[key] = snd_chan
try:
async with rcv_chan:
yield rcv_chan
finally:
self.subscribers.pop(key, None)
async def publish(self, obj):
for chan in list(self.subscribers.values()):
await chan.send(obj)
async def subscriber(bcaster, ind, task_status=trio.TASK_STATUS_IGNORED):
async with bcaster.subscribe() as chan:
task_status.started()
async for msg in chan:
print(f'{ind=} {msg=}')
async def publisher(bcaster):
async with bcaster:
await bcaster.publish('this message repeated 10 times')
async def main_async():
bcaster = broadcaster()
async with trio.open_nursery() as nursery:
for s in range(10):
await nursery.start(subscriber, bcaster, s)
nursery.start_soon(publisher, bcaster)
def main():
trio.run(main_async)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment