Skip to content

Instantly share code, notes, and snippets.

@niwinz
Created April 25, 2012 21:30
Show Gist options
  • Save niwinz/2493604 to your computer and use it in GitHub Desktop.
Save niwinz/2493604 to your computer and use it in GitHub Desktop.
Django websockets with tornado, gevent and zeromq
# -*- coding: utf-8 -*-
from gevent import monkey; monkey.patch_all()
import gevent
from gevent_zeromq import zmq
class WebSocketHandler(object):
def __init__(self, _id, in_queue, socket):
self.socket = socket
self.in_queue = in_queue
self._id = _id
def __call__(self):
while True:
message = self.in_queue.get(True)
self.on_message(message)
def on_message(self, message):
pass
def send(self, message):
_msg = {'message':message, 'id': self._id}
self.socket.send_pyobj(_msg)
from gevent.queue import Queue
class WebSocketServer(object):
def __init__(self, handler=WebSocketHandler):
self.ctx = zmq.Context()
self.pub_socket = self.ctx.socket(zmq.PUB)
self.pull_socket = self.ctx.socket(zmq.PULL)
self.handler = handler
def start(self):
self.pub_socket.bind("ipc:///tmp/ws_sub")
self.pull_socket.bind("ipc:///tmp/ws_push")
self._loop()
def _loop(self):
self.connections = {}
while True:
message = self.pull_socket.recv_pyobj()
_id, _msg, _act = message['id'], message['message'], message['action']
if _act == 'connect':
_in_queue = Queue()
_in_queue.put(_msg, block=False)
_handler = self.handler(_id, _in_queue, self.pub_socket)
_greenlet = gevent.spawn(_handler)
self.connections[_id] = (_handler, _in_queue, _greenlet)
elif _act == 'message':
if _id not in self.connections:
print "Ignoring connection from", _id
continue
_handler, _queue, _greenlet = self.connection[_id]
_queue.put(_msg, block=False)
elif _act == 'close':
if _id not in self.connections:
print "Ignoring connection from", _id
continue
_handler, _queue, _greenlet = self.connections[_id]
_greenlet.kill()
del self.connections[_id]
else:
print "Unknown command"
class EchoWSHandler(WebSocketHandler):
def on_message(self, message):
print "Handler", message
self.send(message)
for x in xrange(120):
self.send(str(x))
gevent.sleep(0.1)
if __name__ == '__main__':
server = WebSocketServer(EchoWSHandler)
print "Server started"
server.start()
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>{{ title }}</title>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js" type="text/javascript"></script>
<style type="text/css">
body { font-size: 12px; font-family: monospace; }
</style>
</head>
<body>
<script type="text/javascript">
$(document).ready(function() {
var ws = new WebSocket("ws://localhost:8888/socket");
ws.onopen = function () {
ws.send("First msg");
}
ws.onmessage = function(event) {
$('#main').append('<div>' + event.data + '</div>');
}
$('#main').append('<div> Start! </div>');
});
</script>
<div id="main"></div>
</body>
</div>
# -*- coding: utf-8 -*-
from zmq.eventloop import ioloop; ioloop.install()
from zmq.eventloop.zmqstream import ZMQStream
import zmq
from tornado import websocket
import tornado
import cPickle as pickle
ctx = zmq.Context()
class WebHandler(tornado.web.RequestHandler):
def get(self):
self.render("template.html", title="My title")
class MainHandler(websocket.WebSocketHandler):
_first = True
@property
def ref(self):
return id(self)
def initialize(self):
self.push_socket = ctx.socket(zmq.PUSH)
self.sub_socket = ctx.socket(zmq.SUB)
self.push_socket.connect("ipc:///tmp/ws_push")
self.sub_socket.connect("ipc:///tmp/ws_sub")
self.sub_socket.setsockopt(zmq.SUBSCRIBE, "")
self.zmq_stream = ZMQStream(self.sub_socket)
self.zmq_stream.on_recv(self.zmq_msg_recv)
def open(self, *args, **kwargs):
print "WebSocket opened", args, kwargs
def on_message(self, message):
if self._first:
msg = {'message': message, 'id':self.ref, 'action':'connect'}
self._first = False
else:
msg = {'message': message, 'id':self.ref, 'action':'message'}
self.push_socket.send_pyobj(msg)
def on_close(self):
print "WebSocket closed"
msg = {'message': '', 'id': id(self), 'action': 'close'}
self.push_socket.send_pyobj(msg)
self.zmq_stream.close()
self.sub_socket.close()
self.push_socket.close()
def zmq_msg_recv(self, data):
for message in data:
message = pickle.loads(message)
_id, _msg = message['id'], message['message']
if _id != self.ref:
continue
self.write_message(_msg)
application = tornado.web.Application([
(r"/", WebHandler),
(r"/socket", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment