Created
September 21, 2011 13:19
-
-
Save mjtko/1232015 to your computer and use it in GitHub Desktop.
Synchronous middleware for Faye
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'faye' | |
require 'json' | |
require 'rack' | |
require 'cgi' | |
module Faye | |
class SyncMiddleware | |
include ::Faye::Logging | |
DEFAULT_ENDPOINT = '/bayeux' | |
SCRIPT_PATH = File.join(::Faye::ROOT, 'faye-browser-min.js') | |
TYPE_JSON = {'Content-Type' => 'application/json'} | |
TYPE_SCRIPT = {'Content-Type' => 'text/javascript'} | |
TYPE_TEXT = {'Content-Type' => 'text/plain'} | |
def initialize(app = nil, options = nil) | |
@app = app if app.respond_to?(:call) | |
@options = [app, options].grep(Hash).first || {} | |
@endpoint = @options[:mount] || DEFAULT_ENDPOINT | |
@endpoint_re = Regexp.new('^' + @endpoint + '(/[^/]*)*(\\.js)?$') | |
@server = ::Faye::Server.new(@options) | |
return unless extensions = @options[:extensions] | |
[*extensions].each { |extension| add_extension(extension) } | |
end | |
def add_extension(extension) | |
@server.add_extension(extension) | |
end | |
def remove_extension(extension) | |
@server.remove_extension(extension) | |
end | |
def get_client | |
@client ||= ::Faye::Client.new(@server) | |
end | |
def call(env) | |
::Faye.ensure_reactor_running! | |
request = Rack::Request.new(env) | |
unless request.path_info =~ @endpoint_re | |
env['faye.client'] = get_client | |
return @app ? @app.call(env) : | |
[ | |
404, | |
TYPE_TEXT, | |
["Sure you're not looking for #{@endpoint} ?"] | |
# emacs syntax highlighting breaks without a spare quote fsr, so... >> " | |
] | |
end | |
return handle_options(request) if env['REQUEST_METHOD'] == 'OPTIONS' | |
raise "Synchronous Faye middleware does not support WebSockets" if env['HTTP_UPGRADE'] =~ /^WebSocket$/i | |
return [200, TYPE_SCRIPT, File.new(SCRIPT_PATH)] if request.path_info =~ /\.js$/ | |
handle_request(request) | |
end | |
private | |
def handle_request(request) | |
json_msg = message_from_request(request) | |
message = JSON.parse(json_msg) | |
jsonp = request.params['jsonp'] || ::Faye::JSONP_CALLBACK | |
head = request.get? ? TYPE_SCRIPT.dup : TYPE_JSON.dup | |
origin = request.env['HTTP_ORIGIN'] | |
debug 'Received ?: ?', request.env['REQUEST_METHOD'], json_msg | |
@server.flush_connection(message) if request.get? | |
head['Access-Control-Allow-Origin'] = origin if origin | |
body = DeferredBody.new | |
@server.process(message, false) do |replies| | |
response = JSON.unparse(replies) | |
response = "#{ jsonp }(#{ response });" if request.get? | |
debug 'Returning ?', response | |
body.succeed(response) | |
end | |
body.await_response | |
[200, head, body] | |
rescue | |
error $!.message | |
warn $!.backtrace.join("\n") | |
[400, TYPE_TEXT, ['Bad request']] | |
end | |
def message_from_request(request) | |
message = request.params['message'] | |
return message if message | |
# Some clients do not send a content-type, e.g. | |
# Internet Explorer when using cross-origin-long-polling | |
# Some use application/xml when using CORS | |
content_type = request.env['CONTENT_TYPE'] || '' | |
if content_type.split(';').first == 'application/json' | |
request.body.read | |
else | |
CGI.parse(request.body.read)['message'][0] | |
end | |
end | |
def handle_options(request) | |
headers = { | |
'Access-Control-Allow-Origin' => '*', | |
'Access-Control-Allow-Credentials' => 'false', | |
'Access-Control-Max-Age' => '86400', | |
'Access-Control-Allow-Methods' => 'POST, GET, PUT, DELETE, OPTIONS', | |
'Access-Control-Allow-Headers' => 'Accept, Content-Type, X-Requested-With' | |
} | |
[200, headers, ['']] | |
end | |
class DeferredBody | |
include EventMachine::Deferrable | |
include ::Faye::Logging | |
alias :each :callback | |
def initialize | |
@mutex = Mutex.new | |
callback &(method(:revive)) | |
end | |
def revive(*args) | |
@mutex.synchronize { | |
@revived = true | |
unless @waiter.nil? | |
debug "Reviving [#{@waiter.object_id}]." | |
@waiter.run | |
end | |
debug "Revived [#{@waiter && @waiter.object_id}]." | |
} | |
end | |
def await_response | |
@mutex.synchronize { | |
unless @revived || ( @deferred_status && @deferred_status != :unknown ) | |
@waiter = Thread.new { Thread.stop; debug "Waiter ends." } | |
debug "Spawned waiter [#{@waiter.object_id}]." | |
else | |
debug "Retaining." | |
end | |
} | |
unless @waiter.nil? | |
debug "Joining waiter [#{@waiter.object_id}]." | |
@waiter.join | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment