Created
July 6, 2016 13:47
-
-
Save szastupov/de3e3f884415892b3bf22c906b490a11 to your computer and use it in GitHub Desktop.
websocket/long polling event bus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from tornado.websocket import WebSocketHandler | |
from tornado.web import asynchronous | |
from tornado.ioloop import PeriodicCallback | |
from helpers.event_bus import bus | |
from helpers.api import CommonHandlerMixin | |
from helpers.auth import http_token | |
from functools import partial | |
from . import routes | |
logger = logging.getLogger(__name__) | |
@routes.add("/api/v1/events") | |
class EventsHandler(WebSocketHandler, CommonHandlerMixin): | |
@asynchronous | |
def get(self): | |
"""Receive event stream for authenticated user | |
Description: | | |
This endpoint can be used in Web Socket and Long Polling modes. | |
To use it as web socket, connect with ws:// protocol and send | |
"Token: ..." message to authorize. Then you'll start receiving | |
messages with each message. To use it in long polling mode, | |
just GET this endpoint with typical Authorization header and | |
you will receive events in HTTP chunks. | |
""" | |
if self.request.headers.get("Upgrade"): | |
# This is a Web Socket connection | |
return super().get() | |
else: | |
# Fallback to long polling | |
self.current_user = self.authorize() | |
self.channel = None | |
self.periodic = None | |
if not self.current_user: | |
self.set_status(401) | |
self.write_json({"code": 401, "message": "Not authorized"}) | |
self.finish() | |
else: | |
self.subscribe(self.write_event) | |
self.flush() # flush headers | |
def subscribe(self, callback): | |
self.channel = "user_%d" % self.current_user | |
self.callback = bus.subscribe(self.channel, callback) | |
def write_event(self, payload): | |
self.write(payload) | |
self.flush() | |
def open(self): | |
self.channel = None | |
self.periodic = PeriodicCallback(partial(self.ping, b""), 20000) | |
def on_close(self): | |
if self.channel: | |
bus.unsubscribe(self.channel, self.callback) | |
if self.periodic: | |
self.periodic.stop() | |
def on_message(self, msg): | |
if not msg.startswith("Token"): | |
self.write_error(400, "Expected Token") | |
else: | |
if self.channel: | |
self.write_error(400, "Socket is already authorized") | |
return | |
self.current_user = http_token(msg) | |
if not self.current_user: | |
self.write_error(403, "Invalid Token") | |
else: | |
self.subscribe(self.write_message) | |
self.write_message({"authorized": True}) | |
self.periodic.start() | |
def write_error(self, code, message): | |
self.write_message({ | |
"error": code, | |
"message": message | |
}) | |
def check_origin(self, origin): | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment