This is an outdated version
Last active
November 15, 2021 12:03
-
-
Save ahopkins/52bcd7d15de1e0356ee22f82b6cbf9c8 to your computer and use it in GitHub Desktop.
Sanic Websocket Feed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .objects import Feed | |
# from sanic_jwt.decorators import protected | |
feeds = {} | |
def get_feed(feed_name, app): | |
if feed_name in feeds: | |
return feeds.get(feed_name) | |
else: | |
feed = Feed(app=app, feed_name=feed_name) | |
feeds[feed_name] = feed | |
return feed | |
def startup(app): | |
@app.websocket('/feed/<feed_name>') | |
# @protected() | |
async def feed(request, ws, feed_name): | |
# TODO: | |
# - In order to protect the view to authenticated users: | |
# - V1 | |
# - Upon subscription, store the client with a token | |
# - Send the token to the browser through the client | |
# - Client receives the token and submits AJAX call with token | |
# - Endpoint receives the token and using existing authentication to update client as authenticated | |
# - V2 | |
# - Before opening WebSocket, send HTTP request | |
# - Generate ticket with the access credentials | |
# - Return to be stored in browser | |
# - Open WebSocket, send the ticket | |
# - Attach the client connection to the ticket and access credentials | |
channel_name = feed_name.split('__')[-1] | |
print(f'The feed_name: {feed_name}') | |
print(f'The channel_name: {channel_name}') | |
print(ws.__class__.__name__) | |
print(hash(ws)) | |
print(dir(request)) | |
print(request.token) | |
print(request.headers) | |
feed = get_feed(channel_name, request.app) | |
await feed.run(ws) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import asyncio | |
from websockets.exceptions import ConnectionClosed | |
class Feed(object): | |
def __init__(self, app, feed_name): | |
self.name = feed_name | |
self.app = app | |
self.clients = set() | |
def __len__(self): | |
return len(self.clients) | |
async def run(self, client): | |
await self._subscribe(client) | |
tasks = self.get_tasks(client) | |
await asyncio.wait(tasks) | |
def get_tasks(self, client): | |
consumer_task = asyncio.ensure_future(self._consumer_handler(client)) | |
producer_task = asyncio.ensure_future(self._producer_handler()) | |
return [consumer_task, producer_task] | |
async def persist(self, payload): | |
print('Message to persist: ', payload) | |
output = json.dumps(payload) | |
await self.app.redis.publish(self.name, output) | |
async def _consumer_handler(self, client): | |
print('') | |
print('consumer_handler') | |
while True: | |
try: | |
message = await client.recv() | |
print('') | |
print('message arrived', message) | |
await self._ingest(message) | |
# await self.app.redis.publish(self.name, json.dumps(message.get('data'))) | |
except ConnectionClosed: | |
print('closing connection') | |
self._leave(client) | |
break | |
async def _producer_handler(self): | |
print('') | |
print('producer_handler') | |
while True: | |
message = await self.app.pubsub.get_message() | |
if message: | |
await self._broadcast(message) | |
await asyncio.sleep(1) | |
async def _ingest(self, message): | |
print('ingesting', message) | |
message = json.loads(message) | |
if 'action' in message and hasattr(self, message.get('action')): | |
action = getattr(self, message.get('action')) | |
await action(payload=message.get('payload', None)) | |
else: | |
print(f'Could not locate the action:') | |
print(message) | |
async def _broadcast(self, message): | |
print('broadcasting to {} clients:'.format(len(self.clients)), message) | |
data = self._publishable(message.get('data', None)) | |
for client in self.clients: | |
try: | |
await client.send(data) | |
except ConnectionClosed: | |
print('closing connection') | |
self._leave(client) | |
def _leave(self, client): | |
try: | |
self.clients.remove(client) | |
except ValueError: | |
pass | |
async def _subscribe(self, client): | |
await self.app.pubsub.subscribe(self.name) | |
self.clients.add(client) | |
def _publishable(self, raw): | |
if raw is None: | |
return '' | |
if isinstance(raw, str): | |
return raw | |
else: | |
return json.dumps(raw) |
Yes. I have something that is based off this with a few more features built in. I use both aredis and aioredis. I prefer the latter for handling pubsub, but the former for its api in general Redis usage.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this. Are you still using this gist ^^. Do you use
aioredis
to build theapp.pubsub
?