Skip to content

Instantly share code, notes, and snippets.

@hiway
Last active June 18, 2017 20:42
Show Gist options
  • Select an option

  • Save hiway/9e62a6c3c8bee8a6de5b6fff3119e3d6 to your computer and use it in GitHub Desktop.

Select an option

Save hiway/9e62a6c3c8bee8a6de5b6fff3119e3d6 to your computer and use it in GitHub Desktop.
Multi-worker uvicorn on single machine: LocalBroadcast middleware uses redislite to run embedded redis server w/ unix pipes (no network server)

Installation / Dependencies

Get these files on your machine.

$ pip install uvicorn

Usage

$ uvicorn app:chat_server -w 3

What the what?

  1. If you know what uvicorn is, skip to 2. Else, open (https://github.com/tomchristie/uvicorn) in a background tab while I tell you how brilliant this new project is. It takes several well established ideas and implements them fresh with absolutely no fluff.

    1.1 Uvicorn is a ridiculously fast http and websocket framework.

    1.2 Write asynchronous worker functions using Python's async/await.

    1.3 Uses ASGI, a protocol championed by django-channels.

    1.4 It can use a redis server to go async + parallel = ridiculous * hahahamadness! fast.

    1.5 Deploy it behind a hardened proxy/forwarding server.

  2. You want to run your brand new uvicorn project on a small VPS?

    2.1 You want to run redis as a unix service.

    2.2 You don't want to manage another dependency.

    2.3 You don't need redis for anything else.

  3. Redislite to the rescue! (https://github.com/yahoo/redislite)

    3.1 Redislite bundles redis with the pip-installable package.

    3.2 It runs the embedded redis with network access disabled.

    3.3 It can monkey patch existing redis using libraries.

  4. This gist implements the solution to the problem and available options explained above :)

    4.1 Yep, it builds on code from uvicorn's own examples.

from .local_broadcast import LocalBroadcastMiddleware
with open('index.html', 'rb') as file:
homepage = file.read()
async def chat_server(message, channels):
"""
A WebSocket based chat server.
"""
if message['channel'] == 'websocket.connect':
await channels['groups'].send({
'group': 'chat',
'add': channels['reply'].name
})
elif message['channel'] == 'websocket.receive':
await channels['groups'].send({
'group': 'chat',
'send': {'text': message['text']}
})
elif message['channel'] == 'websocket.disconnect':
await channels['groups'].send({
'group': 'chat',
'discard': channels['reply'].name
})
elif message['channel'] == 'http.request':
await channels['reply'].send({
'status': 200,
'headers': [
[b'content-type', b'text/html'],
],
'content': homepage
})
chat_server = LocalBroadcastMiddleware(chat_server, 'channels.redis.db')
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label for="messageText">Message:</label>
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://127.0.0.1:8000/");
ws.onmessage = function (event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
# coding=utf-8
import asyncio
import atexit
import json
import collections
class PubSubChannel(object):
def __init__(self, pub, sub):
self._pub = pub
self._sub = sub
# Keep a mapping from group -> set(channel names)
self._subscribers = collections.defaultdict(set)
async def send(self, message):
group = message['group']
if 'add' in message:
if not self._subscribers[group]:
await self._sub.subscribe([group])
self._subscribers[group] |= set([message['add']])
if 'discard' in message:
self._subscribers[group] -= set([message['discard']])
if not self._subscribers[group]:
await self._sub.unsubscribe([group])
if 'send' in message:
text = json.dumps(message['send'])
await self._pub.publish(group, text)
async def listener(sub, subscribers, clients):
while True:
reply = await sub.next_published()
message = json.loads(reply.value)
for channel_name in subscribers[reply.channel]:
await clients[channel_name].send(message)
class LocalBroadcastMiddleware(object):
def __init__(self, asgi, db_path):
import redislite.patch
redislite.patch.patch_redis(db_path)
import redislite
self.redis = redislite.Redis(db_path)
atexit.register(self.redis.shutdown)
self.asgi = asgi
self.started = False
self.clients = {}
self.pubsub = None
async def __call__(self, message, channels):
import asyncio_redis
if self.pubsub is None:
pub = await asyncio_redis.Connection.create(self.redis.socket_file, 0)
sub = await asyncio_redis.Connection.create(self.redis.socket_file, 0)
sub = await sub.start_subscribe()
self.pubsub = PubSubChannel(pub, sub)
loop = asyncio.get_event_loop()
loop.create_task(listener(sub, self.pubsub._subscribers, self.clients))
# Keep track of all connected clients.
if message['channel'] == 'websocket.connect':
reply = channels['reply']
self.clients[reply.name] = reply
elif message['channel'] == 'websocket.disconnect':
reply = channels['reply']
self.clients.pop(reply.name)
# Inject the groups channel.
channels['groups'] = self.pubsub
return await self.asgi(message, channels)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment