Skip to content

Instantly share code, notes, and snippets.

@mjtko
Created September 21, 2011 13:19
Show Gist options
  • Save mjtko/1232015 to your computer and use it in GitHub Desktop.
Save mjtko/1232015 to your computer and use it in GitHub Desktop.
Synchronous middleware for Faye
require 'faye'
require 'json'
require 'rack'
require 'cgi'
module Faye
class SyncMiddleware
include ::Faye::Logging
DEFAULT_ENDPOINT = '/bayeux'
SCRIPT_PATH = File.join(::Faye::ROOT, 'faye-browser-min.js')
TYPE_JSON = {'Content-Type' => 'application/json'}
TYPE_SCRIPT = {'Content-Type' => 'text/javascript'}
TYPE_TEXT = {'Content-Type' => 'text/plain'}
def initialize(app = nil, options = nil)
@app = app if app.respond_to?(:call)
@options = [app, options].grep(Hash).first || {}
@endpoint = @options[:mount] || DEFAULT_ENDPOINT
@endpoint_re = Regexp.new('^' + @endpoint + '(/[^/]*)*(\\.js)?$')
@server = ::Faye::Server.new(@options)
return unless extensions = @options[:extensions]
[*extensions].each { |extension| add_extension(extension) }
end
def add_extension(extension)
@server.add_extension(extension)
end
def remove_extension(extension)
@server.remove_extension(extension)
end
def get_client
@client ||= ::Faye::Client.new(@server)
end
def call(env)
::Faye.ensure_reactor_running!
request = Rack::Request.new(env)
unless request.path_info =~ @endpoint_re
env['faye.client'] = get_client
return @app ? @app.call(env) :
[
404,
TYPE_TEXT,
["Sure you're not looking for #{@endpoint} ?"]
# emacs syntax highlighting breaks without a spare quote fsr, so... >> "
]
end
return handle_options(request) if env['REQUEST_METHOD'] == 'OPTIONS'
raise "Synchronous Faye middleware does not support WebSockets" if env['HTTP_UPGRADE'] =~ /^WebSocket$/i
return [200, TYPE_SCRIPT, File.new(SCRIPT_PATH)] if request.path_info =~ /\.js$/
handle_request(request)
end
private
def handle_request(request)
json_msg = message_from_request(request)
message = JSON.parse(json_msg)
jsonp = request.params['jsonp'] || ::Faye::JSONP_CALLBACK
head = request.get? ? TYPE_SCRIPT.dup : TYPE_JSON.dup
origin = request.env['HTTP_ORIGIN']
debug 'Received ?: ?', request.env['REQUEST_METHOD'], json_msg
@server.flush_connection(message) if request.get?
head['Access-Control-Allow-Origin'] = origin if origin
body = DeferredBody.new
@server.process(message, false) do |replies|
response = JSON.unparse(replies)
response = "#{ jsonp }(#{ response });" if request.get?
debug 'Returning ?', response
body.succeed(response)
end
body.await_response
[200, head, body]
rescue
error $!.message
warn $!.backtrace.join("\n")
[400, TYPE_TEXT, ['Bad request']]
end
def message_from_request(request)
message = request.params['message']
return message if message
# Some clients do not send a content-type, e.g.
# Internet Explorer when using cross-origin-long-polling
# Some use application/xml when using CORS
content_type = request.env['CONTENT_TYPE'] || ''
if content_type.split(';').first == 'application/json'
request.body.read
else
CGI.parse(request.body.read)['message'][0]
end
end
def handle_options(request)
headers = {
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Credentials' => 'false',
'Access-Control-Max-Age' => '86400',
'Access-Control-Allow-Methods' => 'POST, GET, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers' => 'Accept, Content-Type, X-Requested-With'
}
[200, headers, ['']]
end
class DeferredBody
include EventMachine::Deferrable
include ::Faye::Logging
alias :each :callback
def initialize
@mutex = Mutex.new
callback &(method(:revive))
end
def revive(*args)
@mutex.synchronize {
@revived = true
unless @waiter.nil?
debug "Reviving [#{@waiter.object_id}]."
@waiter.run
end
debug "Revived [#{@waiter && @waiter.object_id}]."
}
end
def await_response
@mutex.synchronize {
unless @revived || ( @deferred_status && @deferred_status != :unknown )
@waiter = Thread.new { Thread.stop; debug "Waiter ends." }
debug "Spawned waiter [#{@waiter.object_id}]."
else
debug "Retaining."
end
}
unless @waiter.nil?
debug "Joining waiter [#{@waiter.object_id}]."
@waiter.join
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment