Last active
December 31, 2019 01:39
-
-
Save parity3/3c26e32fd3f475597d6e8b16587a2a9f to your computer and use it in GitHub Desktop.
publish-subscribe with sequential async delivery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import asynccontextmanager | |
import trio | |
class broadcaster: | |
def __init__(self): | |
super().__init__() | |
self.subscribers = {} | |
async def __aenter__(self): | |
return self | |
async def aclose(self): | |
subs = list(self.subscribers.values()) | |
for chan in subs: | |
try: | |
await chan.aclose() | |
except trio.ClosedResourceError: | |
pass | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self.aclose() | |
@asynccontextmanager | |
async def subscribe(self): | |
snd_chan, rcv_chan = trio.open_memory_channel(0) | |
key = object() | |
async with snd_chan: | |
self.subscribers[key] = snd_chan | |
try: | |
async with rcv_chan: | |
yield rcv_chan | |
finally: | |
self.subscribers.pop(key, None) | |
async def publish(self, obj): | |
for chan in list(self.subscribers.values()): | |
await chan.send(obj) | |
async def subscriber(bcaster, ind, task_status=trio.TASK_STATUS_IGNORED): | |
async with bcaster.subscribe() as chan: | |
task_status.started() | |
async for msg in chan: | |
print(f'{ind=} {msg=}') | |
async def publisher(bcaster): | |
async with bcaster: | |
await bcaster.publish('this message repeated 10 times') | |
async def main_async(): | |
bcaster = broadcaster() | |
async with trio.open_nursery() as nursery: | |
for s in range(10): | |
await nursery.start(subscriber, bcaster, s) | |
nursery.start_soon(publisher, bcaster) | |
def main(): | |
trio.run(main_async) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment