Last active
December 16, 2020 01:25
-
-
Save Wh1t3Fox/8775f134b55f6c7ad8ba91d86554e646 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# producer.py | |
# https://yeti.co/blog/establishing-a-websocket-pubsub-server-with-redis-and-asyncio-for-the-light-sensor/ | |
import asyncio | |
from aioredis import create_connection, Channel | |
import websockets | |
async def subscribe_to_redis(path): | |
conn = await create_connection(('localhost', 6379)) | |
# Set up a subscribe channel | |
channel = Channel('lightlevel{}'.format(path), is_pattern=False) | |
await conn.execute_pubsub('subscribe', channel) | |
return channel, conn | |
async def consumer_handler(websocket, path): | |
pass # process websocket data | |
async def producer_handler(websocket, path): | |
channel, conn = await subscribe_to_redis(path) | |
try: | |
while True: | |
# Wait until data is published to this channel | |
message = await channel.get() | |
# Send unicode decoded data over to the websocket client | |
await websocket.send(message.decode('utf-8')) | |
except websockets.exceptions.ConnectionClosed: | |
# Free up channel if websocket goes down | |
await conn.execute_pubsub('unsubscribe', channel) | |
conn.close() | |
async def handler(websocket, path): | |
consumer_task = asyncio.ensure_future( | |
consumer_handler(websocket, path)) | |
producer_task = asyncio.ensure_future( | |
producer_handler(websocket, path)) | |
done, pending = await asyncio.wait( | |
[consumer_task, producer_task], | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
for task in pending: | |
task.cancel() | |
if __name__ == '__main__': | |
# Runs a server process on 8767. Just do 'python producer.py' | |
loop = asyncio.get_event_loop() | |
loop.set_debug(True) | |
ws_server = websockets.serve(handler, 'localhost', 8767) | |
loop.run_until_complete(ws_server) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment