Skip to content

Instantly share code, notes, and snippets.

@vortec
Created June 23, 2013 01:35
Show Gist options
  • Save vortec/5843378 to your computer and use it in GitHub Desktop.
Save vortec/5843378 to your computer and use it in GitHub Desktop.
Late night implementation of Tornado + Redis PubSub + Websockets. It's late, the code is messy, but it should demonstrate how the general construct works. Feel free to try it on your own after studying it.
import json
import os
import tornado.httpserver
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.gen
import tornadoredis
class RepiServer(object):
def __init__(self, r, name='master', namespace='repi:', info_channel='cluster', port=8888):
self.redis = r
self.name = name
self.namespace = namespace
self.info_channel = info_channel
self.port = port
def subscribe(self, channel):
channel = self._prefixChannel(channel)
self.redis.subscribe(channel)
def unsubscribe(self, channel):
channel = self._prefixChannel(channel)
self.redis.unsubscribe(channel)
def publish(self, command, data=None, channel=None):
if channel:
channel = self._prefixChannel(channel)
else:
channel = self._prefixChannel(self.info_channel)
message = {
'client': self.name,
'command': command,
'data': data
}
json_message = json.dumps(message)
self.redis.publish(channel, json_message, lambda x: None)
def run(self):
application = tornado.web.Application([
(r'/', MainHandler, dict(repi=self)),
(r'/ws', RepiConnection, dict(repi=self))
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(self.port)
tornado.ioloop.IOLoop.instance().start()
def _prefixChannel(self, channel):
return '{}{}'.format(self.namespace, channel)
class MainHandler(tornado.web.RequestHandler):
def initialize(self, repi, *args, **kwargs):
self.repi = repi
super(MainHandler, self).initialize(*args, **kwargs)
def get(self):
template_kwargs = {
'title': 'RePi Server',
'info_channel': self.repi.info_channel
}
self.render(os.path.join('view', 'index.html'), **template_kwargs)
class RepiConnection(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
super(RepiConnection, self).__init__(*args, **kwargs)
self.listen()
def initialize(self, repi, *args, **kwargs):
self.repi = repi
self.channel = self.repi._prefixChannel(self.repi.info_channel)
super(RepiConnection, self).initialize(*args, **kwargs)
@tornado.gen.engine
def listen(self):
self.redis = tornadoredis.Client()
self.redis.connect()
channel = self.repi._prefixChannel(self.repi.info_channel)
yield tornado.gen.Task(self.redis.subscribe, channel)
self.redis.listen(self.on_redis_message)
def on_websocket_message(self, json_message):
# Decode JSON
try:
message = json.loads(json_message)
except ValueError, err:
print 'Invalid JSON.'
return
# Sanity check
if not {'channel', 'command', 'data'}.issubset(message):
print 'Invalid protocol.'
return
self.repi.publish(message['command'])#, data=message['data'], channel=message['channel'])
on_message = on_websocket_message
def on_redis_message(self, message):
channel = message.channel
if message.kind == 'message':
# Decode JSON
json_message = str(message.body)
try:
message = json.loads(json_message)
except ValueError, err:
print 'Invalid JSON.'
return
# Sanity check
if not {'client', 'command', 'data'}.issubset(message):
print 'Invalid protocol.'
return
# Write to WebSocket
self.write_message(json_message)
elif message.kind == 'disconnect':
self.close()
def on_close(self):
if self.redis.subscribed:
self.redis.unsubscribe('test_channel')
self.redis.disconnect()
r = tornadoredis.Client()
repi_server = RepiServer(r)
repi_server.run()
@fabiomontefuscolo
Copy link

Super cool! After 5 years, is it still valid for nowadays?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment