Skip to content

Instantly share code, notes, and snippets.

@collin
Created June 24, 2009 06:34
Show Gist options
  • Select an option

  • Save collin/135055 to your computer and use it in GitHub Desktop.

Select an option

Save collin/135055 to your computer and use it in GitHub Desktop.
module Orbited
module Transport
Map = {
'xhrstream' => XHRStreaming,
'htmlfile' => HTMLFile,
'sse' => SSE,
'longpoll' => LongPolling,
'poll' => Polling
}
def self.create transport_name, connection
klass = Map[transport_name]
return unless klass
klass.new connection
end
class Packet
attr_reader :id, :name, :data
def initialize id, name, data={}
self.id = id
self.name = name
self.data = data
end
end
class Abstract
HeartbeatInterval = 5
MaxBytes = 1048576
CacheControl = 'no-cache, must-revalidate'
attr_accessor :connection
attr_accessor :open
attr_accessor :closed
attr_accessor :heartbeat_timer
alias closed? closed
alias open? open
def initialize connection
self.connection = connection
self.open = false
self.closed = false
end
def render(request)
self.open = true
self.packets = []
self.request = request
self.opened
self.resetHeartbeat
# self.closeDeferred = defer.Deferred
# self.conn.transportOpened
# return server.NOT_DONE_YET
end
def resetHeartbeat
self.heartbeat_timer =
EventMachine::add_timer(HeartbeatInterval) do
do_heartbeat
end
end
def do_heartbeat
if closed?
else
write_heartbeat
reset_heartbeat
end
end
def sendPacket(packet)
self.packets << packet
end
def flush
write packets
self.packets = []
heartbeat_timer.cancel
reset_heartbeat
end
def onClose
logger.debug('onClose called')
return self.closeDeferred
def close
if closed?
logger.debug('close called - already closed')
return
end
self.closed = true
heartbeat_timer.cancel
self.open = false
if request
logger.debug('calling finish')
request.finish
end
self.request = nil
self.closeDeferred.callback
self.closeDeferred = None
end
def encode(packets)
output = []
packets.each do |packet|
packet.each_with_index do |index, arg|
if index == packet.size - 1:
output << '0'
else
output << '1'
end
output << "#{arg.length},#{arg}"
end
return output.join
end
# Override these
def write(packets)
raise "Unimplemented"
end
def opened
raise "Unimplemented"
end
def writeHeartbeat
raise "Unimplemented"
end
end
class XHRStreaming < Abstract
ContentType = 'application/x-orbited-event-stream'
def opened
@total_bytes = 0
request.headers['content-type'] = ContentType
# Safari/Tiger may need 256 bytes
request.write(' ' * 256)
end
def trigger_close_timeout
logger.debug('trigger_close_timeout called')
close
end
def write(packets)
logger.debug('write %r' % packets)
# TODO why join the packets here? why not do N request.write?
payload = encode(packets)
logger.debug('WRITE ' + payload)
request.write(payload)
@total_bytes += payload.size
if @total_bytes > MaxBytes
logger.debug('over maxbytes limit')
close
end
end
def write_heartbeat
logger.debug("writeHeartbeat #{inspect}")
request.write('x')
end
end
class HTMLFile < Abstract
initial_data = <<-HTML
<html>
<head>
<script src="../static/HTMLFileFrame.js"></script>
</head>
<body>
HTML
InitialData = ([0, 256 - initial_data.size].max * ' ') + "\n"
Pragma = 'no-cache'
Expires = -1
def opened
logger.debug('opened!')
# Force reconnect ever 30 seconds
@total_bytes = 0
# self.closeTimer = reactor.callLater(5, self.triggerCloseTimeout)
# See "How to prevent caching in Internet Explorer"
# at http://support.microsoft.com/kb/234067
request.headers['cache-control'] = CacheControl
request.headers['pragma'] = Pragma
request.headers['expires'] = Expires
logger.debug('send initialData: ', InitialData)
request.write(InitialData)
def trigger_close_timeout
close
end
def write(packets)
# TODO make some JS code to remove the script elements from DOM
# after they are executed.
payload = "<script>e(#{json.encode(packets)});</script>"
logger.debug('write ', payload)
request.write(payload)
@total_bytes += len(payload)
if @total_bytes > MAXBYTES
logger.debug('write: closing because session MAXBYTES was exceeded')
close
end
end
def writeHeartbeat
logger.debug('writeHeartbeat')
request.write('<script>h;</script>')
end
end
????
class CloseResource(resource.Resource)
def getChild(path, request)
self
end
def render(request)
return format_block("
<html>
<head>
<script src="../../static/HTMLFileClose.js"></script>
</head>
<body>
</body>
</html>
")
end
end
class SSE < Abstract
HeartbeatInterval = 30
ContentType = 'application/x-dom-event-stream'
def opened
request.headers['content-type'] = ContentType
request.headers['cache-control'] = CacheControl
end
def write(packets)
payload = json.encode(packets)
data =
'Event: payload\n' +
payload.split("\n").map{|line| "data: #{line}"}.join("\n") +
'\n\n'
request.write(data)
end
def writeHeartbeat
logger.debug('writeHeartbeat');
request.write('Event: heartbeat\n\n')
end
end
class LongPolling < Abstract
def opened
# Force reconnect ever 45 seconds
self.close_timer = reactor.callLater(30) { trigger_close_timeout }
request.headers['cache-control'] = CacheControl
end
def trigger_close_timeout
close
end
def write(packets)
# TODO: we can optimize this. In the case where packets contains a
# single packet, and its a ping, just don't send it. (instead,
# close the connection. the re-open will prompt the ack)
logger.debug('write %r' % packets)
payload = encode(packets)
logger.debug('WRITE ' + payload)
request.write(payload)
close
end
def writeHeartbeat
# NOTE: no heartbeats...
# pass
end
end
class Pollling < Abstract
def opened
request.headers['cache-control'] = CacheControl
end
# NOTE: we override this so we can close as soon as we send out any waiting
# packets. We can't put the self.close call inside of self.write
# because sometimes there will be no packets to write.
def flush
logger.debug('flush')
CometTransport.flush(self)
close
end
def write(packets)
logger.debug('write %r' % packets)
payload = encode(packets)
logger.debug('WRITE ' + payload)
request.write(payload)
end
def writeHeartbeat
# NOTE: no heartbeats...
# pass
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment