Skip to content

Instantly share code, notes, and snippets.

@AndresMWeber
Created June 25, 2019 18:17
Show Gist options
  • Save AndresMWeber/9cd41fe05824630bc97141dec4df23bf to your computer and use it in GitHub Desktop.
Save AndresMWeber/9cd41fe05824630bc97141dec4df23bf to your computer and use it in GitHub Desktop.
Testing out a Notifier to connect a loop to a separate function
import asyncio
async def shout(msg):
print(msg.upper())
class Notifier(object):
def __init__(self):
self._listeners = []
async def add_listener(self, listener):
self._listeners.append(listener)
async def emit(self, msg):
for listener in self._listeners:
await listener(msg)
async def main():
notify = Notifier()
await notify.add_listener(shout)
while True:
await notify.emit("New Data")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment