Last active
January 2, 2016 00:28
-
-
Save ajdavis/8223183 to your computer and use it in GitHub Desktop.
Tornado test with web sockets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import tornado.ioloop | |
import tornado.web | |
import tornado.websocket | |
from tornado.options import define, options, parse_command_line | |
define("port", default=8888, help="run on the given port", type=int) | |
TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') | |
class IndexHandler(tornado.web.RequestHandler): | |
@tornado.web.asynchronous | |
def get(self): | |
self.render(os.path.join(TEMPLATE_DIR, "index.html")) | |
clients = dict() | |
rooms = dict() | |
class WebSocketHandler(tornado.websocket.WebSocketHandler): | |
def open(self, room_id=None, *args, **kwargs): | |
self.stream.set_nodelay(True) | |
self.room_id = room_id | |
self.join() | |
def join(self): | |
self.access_token = access_token = self.get_argument('access_token', None) | |
clients[access_token] = {self.room_id: self} | |
room = rooms.setdefault(self.room_id, set()) | |
room.add(access_token) | |
self.application.on_room_changed() | |
self.broadcast("{id} just joined the room".format(id=access_token)) | |
def broadcast(self, message): | |
room = rooms[self.room_id] | |
for client_id in room: | |
connections = clients[client_id] | |
connections[self.room_id].write_message(message) | |
def on_message(self, message): | |
self.broadcast(message) | |
def on_close(self): | |
if self in clients: | |
self.clients.remove(self) | |
if self.room_id in rooms: | |
rooms[self.room_id].discard(self.access_token) | |
self.application.on_room_changed() | |
class MyApplication(tornado.web.Application): | |
def __init__(self, *args, **kwargs): | |
super(MyApplication, self).__init__(*args, **kwargs) | |
self.room_change_callbacks = set() | |
def on_room_changed(self): | |
loop = tornado.ioloop.IOLoop.current() | |
for cb in self.room_change_callbacks: | |
loop.add_callback(cb) | |
app = MyApplication([ | |
(r'/', IndexHandler), | |
(r'/rt/(?P<room_id>[a-zA-Z0-9\.:,_]+)/?', WebSocketHandler), | |
]) | |
if __name__ == '__main__': | |
parse_command_line() | |
app.listen(options.port) | |
tornado.ioloop.IOLoop.instance().start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from my_application import app, rooms | |
from tornado import testing, websocket, gen | |
class WebSocketServerTests(testing.AsyncHTTPTestCase): | |
def setUp(self): | |
super(WebSocketServerTests, self).setUp() | |
self.base_url = '/rt/{room_id}/?access_token={token}' | |
def get_app(self): | |
return app | |
def get_protocol(self): | |
return 'ws' | |
@testing.gen_test | |
def test_room_registry(self): | |
room_id = '1' | |
token = '1' | |
url = self.base_url.format(room_id=room_id, token=token) | |
conn1 = yield websocket.websocket_connect(self.get_url(url), | |
io_loop=self.io_loop) | |
message = yield conn1.read_message() | |
self.assertIn(token, rooms[room_id]) | |
callback = yield gen.Callback('change') | |
self._app.room_change_callbacks.add(callback) | |
conn1.protocol.close() | |
yield gen.Wait('change') | |
self._app.room_change_callbacks.remove(callback) | |
self.assertNotIn(token, rooms[room_id]) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running
python test.py
is expected to output: