Skip to content

Instantly share code, notes, and snippets.

@tanbro
Created March 28, 2012 06:55
Show Gist options
  • Save tanbro/2224379 to your computer and use it in GitHub Desktop.
Save tanbro/2224379 to your computer and use it in GitHub Desktop.
Server-Sent Events feature for tornado webserver
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Created on 2011-8-29
@author: tanbro
'''
import os
import tornado.httpserver
import tornado.ioloop
import tornado.web
from serversentevent import EventSource
es1 = EventSource()
es2 = EventSource()
sse_dict = {
'1': es1,
'2': es2,
}
es1.sendEvent('welcome, es1!')
es1.retry = 1000
es2.sendEvent('welcome, es2!')
es2.retry = 1000
class EventChannel(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self, *args, **kwargs):
channel = self.request.path.split('/')[2]
print 'event: CHANNEL %s:' % channel
es = sse_dict[channel]
es.handler = self
def on_connection_close(self):
channel = self.request.path.split('/')[2]
print 'event closed: CHANNEL %s:' % channel
es = sse_dict[channel]
es.handler = None
class PostDataHandle(tornado.web.RequestHandler):
def post(self, *args, **kwargs):
channel = self.request.path.split('/')[2]
print 'post event to CHANNEL %s:' % channel
es = sse_dict[channel]
if self.request.headers.get('Content-Type', '').lower() == 'applicaiton/json':
body_obj = tornado.escape.json_decode(self.request.body)
es.sendEvent(body_obj)
else:
use_url_escape = self.request.body.find('\n') >= 0 or self.request.body.find('\r') >= 0
if use_url_escape:
print 'use_url_escape:', self.request.body
es.sendEvent(self.request.body, use_url_escape=use_url_escape)
def main():
settings = {
"static_path": os.path.join(os.path.dirname(__file__), "static"),
}
application = tornado.web.Application(
[
(r'^/post/([^/]+)($|/$)', PostDataHandle),
(r'^/event/([^/]+)($|/$)', EventChannel),
],
**settings
)
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8080)
tornado.ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Created on 2011-8-29
@author: tanbro
'''
import os
import time
import types
import uuid
import tornado.web
import tornado.escape
import tornado.ioloop
class EventSource(object):
def __init__(self, ioloop=tornado.ioloop.IOLoop.instance()):
self.__handler = None
self.__loop_timeout = None
self.__timeout = 15
self.__enableLastEventId = True
self.__lastEventId = None
self.__retry = None
self.__events = []
self.__sentEvents = []
self.__ioloop = ioloop
def __closeHandler(self):
if (isinstance(self.__loop_timeout, tornado.ioloop._Timeout)):
self.__ioloop.remove_timeout(self.__loop_timeout)
self.__loop_timeout = None
if isinstance(self.__handler, tornado.web.RequestHandler):
if not self.__handler.request.connection.stream.closed():
self.__handler.set_status(204)
self.__handler.finish()
self.__handler = None
def __handler_onTimeout(self):
if (isinstance(self.__loop_timeout, tornado.ioloop._Timeout)):
self.__ioloop.remove_timeout(self.__loop_timeout)
self.__loop_timeout = None
if isinstance(self.__handler, tornado.web.RequestHandler):
if not self.__handler.request.connection.stream.closed():
self.__handler.finish(':')
self.__handler = None
def __responseHandler(self):
result = False
if self.__handler:
events = []
events.extend(self.__sentEvents)
events.extend(self.__events)
self.__events = events
self.__sentEvents = []
if len(self.__events) > 0:
new_event_sent_id = None
res_data = ''
for event in self.__events:
res_data += 'data:%s%s' % (event, os.linesep)
if self.__enableLastEventId:
new_event_sent_id = uuid.uuid1().hex
res_data += 'id:%s%s' % (new_event_sent_id, os.linesep)
if self.__retry is not None:
res_data += 'retry:%d%s' % (self.__retry, os.linesep)
res_data += os.linesep
self.__lastEventId = new_event_sent_id
if new_event_sent_id:
self.__sentEvents = self.__events
self.__events = []
if not self.__handler.request.connection.stream.closed():
self.__handler.finish(res_data)
self.__handler = None
result = True
return result
def get_handler(self):
return self.__handler
def set_handler(self, val):
if isinstance(val, tornado.web.RequestHandler):
self.__closeHandler()
self.__handler = val
self.__handler.set_header('Content-Type', 'text/event-stream; charset=utf-8')
self.__handler.set_header('Cache-Control', 'no-cache')
self.__handler.set_header('Connection', 'keep-alive')
if self.__handler.request.headers:
last_event_id = self.__handler.request.headers.get('Last-Event-ID', None)
if last_event_id is not None:
if last_event_id == self.__lastEventId:
self.__lastEventId = None
self.__sentEvents = []
if not self.__responseHandler():
self.__loop_timeout = self.__ioloop.add_timeout(time.time() + self.__timeout, self.__handler_onTimeout)
elif val is None:
self.__closeHandler()
else:
raise TypeError(' handler attribute must be tornado.web.RequestHandler or None type')
handler = property(get_handler, set_handler)
def get_timeout(self):
return self.__timeout
def set_timeout(self, val):
val = int(val)
if val < 0:
raise ValueError('timout must bigger than 0')
self.__timeout = val
timeout = property(get_timeout, set_timeout)
def get_retry(self):
return self.__retry
def set_retry(self, val):
if val is None:
self.__retry = val
else:
val = int(val)
if int >= 0:
self.__retry = val
else:
raise ValueError('retry must be bigger than 0')
retry = property(get_retry, set_retry)
def get_enableLastEventId(self):
return self.__enableLastEventId
def set_enableLastEventId(self, val):
self.__enableLastEventId = bool(val)
enableLastEventId = property(get_enableLastEventId, set_enableLastEventId)
def get_ioloop(self):
return self.__ioloop
ioloop = property(get_ioloop)
def sendEvent(self, data, **kwargs):
use_json_encode = kwargs.get('use_json_encode', False)
use_url_escape = kwargs.get('use_url_escape', False)
use_xhtml_escape = kwargs.get('use_xhtml_escape', False)
use_squeeze = kwargs.get('use_squeeze', False)
if use_json_encode or isinstance(data, (dict, list, tuple, int, float, bool)):
data = tornado.escape.json_encode(data)
elif data is None:
data = ''
elif type(data) in types.StringTypes:
data = tornado.escape.to_unicode(data)
if use_squeeze:
data = tornado.escape.squeeze(data)
if use_xhtml_escape:
data = tornado.escape.xhtml_escape(data)
if use_url_escape:
data = tornado.escape.url_escape(data)
else:
raise TypeError("data's type should be in (dict, list, tuple, int, float, bool, str, unicode, None)")
self.__events.append(data)
self.__responseHandler()
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<!-- Always force latest IE rendering engine (even in intranet) & Chrome Frame
Remove this if you use the .htaccess -->
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1" />
<title>events</title>
<meta name="description" content="" />
<meta name="generator" content="Studio 3 http://aptana.com/" />
<meta name="author" content="tanbro" />
<meta name="viewport" content="width=device-width; initial-scale=1.0" />
<!-- Replace favicon.ico & apple-touch-icon.png in the root of your domain and delete these references -->
<link rel="shortcut icon" href="/favicon.ico" />
<link rel="apple-touch-icon" href="/apple-touch-icon.png" />
</head>
<body>
<!-- include jquery -->
<script src="http://code.jquery.com/jquery.min.js" type="text/javascript"></script>
<script src="scripts/EventSource.js" type="text/javascript"></script>
<script src="scripts/json2.js" type="text/javascript"></script>
<div>
<div>
Event Channel:
<input type="text" id="txtEventSourceChannel"/>
<input type="button" id="btnStartEvent" value="start event"/>
<input type="button" id="btnStopEvent" value="stop event"/>
</div>
<div>
<input type="button" id="btnPostData" value="post event data"/>
Event Channel:
<input type="text" id="txtPostDataEventChannel"/>
Event Data: <textarea id="txtData"></textarea>
</div>
</div>
<!-- my functions -->
<script type="text/javascript">
//<![CDATA[
// var EventSource;
// if(!EventSource) {
// console.warn("EventSource not supported by the browser. Load eventsource.js");
// $.ajax({
// url : "scripts/eventsource.js",
// dataType : "script",
// async : false,
// success : function(data, textStatus) {
// console.info('eventsource.js loaded');
// },
// error : function(jqXHR, textStatus, errorThrown) {
// console.error('failed to load eventsource.js.', textStatus, errorThrown);
// }
// });
// }
var evsrc;
function startEvent() {
if(evsrc) {
console.error('evsrc already exists.');
return;
}
evsrc = new EventSource('/event/' + encodeURIComponent($('#txtEventSourceChannel')[0].value));
evsrc.onmessage = function(event) {
var messages = event.data.replace(/^\uFEFF/, '').replace(/\r\n?/g, '\n').split('\n');
for(i in messages) {
//eventData = JSON.parse(messages[i]);
//console.log('event object:', eventData);
console.log('event text:', messages[i]);
}
};
evsrc.addEventListener('onerror', function() {
console.debug("evsrc.onerror");
});
evsrc.addEventListener('onopen', function() {
console.debug("evsrc.onopen");
});
}
function stopEvent() {
console.log("stop event....");
if(evsrc) {
evsrc.close();
evsrc = null;
}
}
$('#btnStartEvent').bind('click', function() {
startEvent();
});
$('#btnStopEvent').bind('click', function() {
stopEvent();
});
$('#btnPostData').bind('click', function() {
$.ajax({
url : '/post/' + encodeURIComponent($('#txtPostDataEventChannel')[0].value),
type : 'POST',
data : $('#txtData')[0].value,
success : function(data, textStatus) {
},
error : function(xhr, textStatus, errorThrown) {
},
});
});
//]]>
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment