Skip to content

Instantly share code, notes, and snippets.

@Wh1t3Fox
Last active December 16, 2020 01:25
Show Gist options
  • Save Wh1t3Fox/8775f134b55f6c7ad8ba91d86554e646 to your computer and use it in GitHub Desktop.
Save Wh1t3Fox/8775f134b55f6c7ad8ba91d86554e646 to your computer and use it in GitHub Desktop.
# producer.py
# https://yeti.co/blog/establishing-a-websocket-pubsub-server-with-redis-and-asyncio-for-the-light-sensor/
import asyncio
from aioredis import create_connection, Channel
import websockets
async def subscribe_to_redis(path):
conn = await create_connection(('localhost', 6379))
# Set up a subscribe channel
channel = Channel('lightlevel{}'.format(path), is_pattern=False)
await conn.execute_pubsub('subscribe', channel)
return channel, conn
async def consumer_handler(websocket, path):
pass # process websocket data
async def producer_handler(websocket, path):
channel, conn = await subscribe_to_redis(path)
try:
while True:
# Wait until data is published to this channel
message = await channel.get()
# Send unicode decoded data over to the websocket client
await websocket.send(message.decode('utf-8'))
except websockets.exceptions.ConnectionClosed:
# Free up channel if websocket goes down
await conn.execute_pubsub('unsubscribe', channel)
conn.close()
async def handler(websocket, path):
consumer_task = asyncio.ensure_future(
consumer_handler(websocket, path))
producer_task = asyncio.ensure_future(
producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
if __name__ == '__main__':
# Runs a server process on 8767. Just do 'python producer.py'
loop = asyncio.get_event_loop()
loop.set_debug(True)
ws_server = websockets.serve(handler, 'localhost', 8767)
loop.run_until_complete(ws_server)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment