Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created April 18, 2012 20:09
Show Gist options
  • Save arturaz/2416210 to your computer and use it in GitHub Desktop.
Save arturaz/2416210 to your computer and use it in GitHub Desktop.
Author: Arturas Slajus <[email protected]> 2012-04-18 23:08:34
Committer: Arturas Slajus <[email protected]> 2012-04-18 23:08:34
Parent: f0cc63550a0bb0ca4f60a8a8fc1682d196d57087 (Merge remote-tracking branch 'origin/stable2' into stable2)
Branches: remotes/origin/stable2, stable2
Follows: many_scopes
Precedes:
Use EM instead of celluloid-io for now.
----------------------------------- Gemfile -----------------------------------
index 5290ebd..2e00ff9 100644
@@ -9,12 +9,12 @@ gem 'json', '>=1.4.6', :require => "json/ext"
gem 'activerecord-jdbcmysql-adapter', '~>1.1'
gem 'flag_shih_tzu', :git => "git://github.com/arturaz/flag_shih_tzu.git"
gem "celluloid",
:git => "git://github.com/celluloid/celluloid.git"
#gem "celluloid", '~>0.10.0'
-gem "celluloid-io", :require => "celluloid/io",
- :git => "git://github.com/celluloid/celluloid-io.git"
+#gem "celluloid-io", :require => "celluloid/io",
+# :git => "git://github.com/celluloid/celluloid-io.git"
#gem "celluloid-io", '~>0.10.0', :require => "celluloid/io"
# For natural date parsing, e.g. "in 5 minutes"
gem "chronic", ">=0.6.2"
# Gems that are needed but should never be activated.
@@ -22,10 +22,11 @@ group :installation do
gem 'rake', '~>0.9.0'
end
# Gems that are needed for running (not testing).
group :run_require do
+ gem "eventmachine"
end
# Only needed in production.
group :production_require do
gem 'mail', '>=2.2'
--------------------------------- Gemfile.lock ---------------------------------
index 891cc81..e5ef190 100644
@@ -11,18 +11,10 @@ GIT
specs:
net-ssh (2.3.1)
jruby-pageant (>= 1.0.2)
GIT
- remote: git://github.com/celluloid/celluloid-io.git
- revision: 22c3ca9cc040facf0b3ee8bbd4b2d25fff8e58d4
- specs:
- celluloid-io (0.10.0)
- celluloid (~> 0.10.0)
- nio4r (>= 0.3.1)
-
-GIT
remote: git://github.com/celluloid/celluloid.git
revision: 53b31cb209dd8cc4d3abc89ed98eef9fb0eba01f
specs:
celluloid (0.10.0)
@@ -58,10 +50,11 @@ GEM
bouncy-castle-java (1.5.0146.1)
builder (3.0.0)
chronic (0.6.7)
coderay (1.0.6)
diff-lcs (1.1.3)
+ eventmachine (0.12.10-java)
factory_girl (2.1.2)
activesupport
i18n (0.6.0)
jdbc-mysql (5.1.13)
jruby-openssl (0.7.6.1)
@@ -81,11 +74,10 @@ GEM
multi_json (1.3.2)
net-scp (1.0.4)
net-ssh (>= 1.99.1)
net-sftp (2.0.5)
net-ssh (>= 2.0.9)
- nio4r (0.3.3-java)
polyglot (0.3.3)
rake (0.9.2.2)
rest-client (1.6.7)
mime-types (>= 1.16)
rspec (2.9.0)
@@ -109,12 +101,12 @@ PLATFORMS
DEPENDENCIES
activerecord (~> 3.2.0)
activerecord-jdbcmysql-adapter (~> 1.1)
activesupport (~> 3.2.0)
celluloid!
- celluloid-io!
chronic (>= 0.6.2)
+ eventmachine
factory_girl (~> 2.1.2)
flag_shih_tzu!
jruby-openssl (>= 0.7.6.1)
json (>= 1.4.6)
mail (>= 2.2)
------------------------------ server/lib/main.rb ------------------------------
index c5241a6..99a1e65 100755
@@ -20,12 +20,12 @@ end
# Initialize space mule.
LOGGER.info "Initializing SpaceMule."
SpaceMule.instance
# Ensure server and callback manager are restarted if they crash.
-LOGGER.info "Starting server actor..."
-Celluloid::Actor[:server] = ServerActor.new(CONFIG['server']['port'])
+#LOGGER.info "Starting server actor..."
+#Celluloid::Actor[:server] = ServerActor.new(CONFIG['server']['port'])
LOGGER.info "Starting callback manager actor..."
Celluloid::Actor[:callback_manager] = CallbackManager.new
# Set up signals.
@@ -52,17 +52,35 @@ if App.in_development?
else
trap("INT", &stop_server)
end
trap("TERM", &stop_server)
-# Sleep forever while other threads do the dirty work.
-App.server_state = App::SERVER_STATE_RUNNING
-LOGGER.info "Server initialized."
-sleep 1 until (
- # Normal server shutdown.
- App.server_state == App::SERVER_STATE_SHUTDOWNING ||
- # Server crashed.
- ! Celluloid::Actor[:server].alive?
-)
+## Sleep forever while other threads do the dirty work.
+#App.server_state = App::SERVER_STATE_RUNNING
+#LOGGER.info "Server initialized."
+#sleep 1 until (
+# # Normal server shutdown.
+# App.server_state == App::SERVER_STATE_SHUTDOWNING ||
+# # Server crashed.
+# ! Celluloid::Actor[:server].alive?
+#)
+
+LOGGER.info "Starting EventMachine."
+EventMachine::run {
+ EventMachine::start_server "0.0.0.0", CONFIG['server']['port'], ServerMachine
+ EventMachine::add_periodic_timer(1) do
+ if \
+ # Normal server shutdown.
+ App.server_state == App::SERVER_STATE_SHUTDOWNING ||
+ # Dispatcher crashed.
+ ! Celluloid::Actor[:dispatcher].alive?
+ LOGGER.info "EventMachine shutting down."
+ EventMachine.stop_event_loop
+ end
+ end
+
+ App.server_state = App::SERVER_STATE_RUNNING
+ LOGGER.info "EventMachine started. Server ready."
+}
LOGGER.info "Server stopped."
sleep 1 # Allow last messages to be written to the logfile.
\ No newline at end of file
----------------------- server/lib/server/dispatcher.rb -----------------------
index 0ad2b48..238b034 100644
@@ -464,11 +464,14 @@ class Dispatcher
end
# Set message id and push it into outgoing messages stack for given IO.
def transmit_to_client(client, message_hash)
message_hash[MESSAGE_ID_KEY] = next_message_id
- Actor[:server].write!(client, message_hash)
+ EventMachine.next_tick do
+ client.em_connection.write(message_hash)
+ end
+ #Actor[:server].write!(client, message_hash)
end
end
# Preload some essentials.
require File.dirname(__FILE__) + '/dispatcher/scope'
\ No newline at end of file
---------------------- server/lib/server/server_actor.rb ----------------------
index 39731db..92e3000 100644
@@ -1,149 +1,149 @@
class ServerActor
- include NamedLogMessages
- include FlashPolicyHandler
- include Celluloid::IO
-
- IO_ERRORS = [EOFError, IOError, Errno::ECONNRESET, Errno::EBADF, Errno::EPIPE]
-
- def initialize(port)
- @server = Celluloid::IO::TCPServer.new("0.0.0.0", port)
-
- # client -> socket
- @sockets = {}
-
- # We depend on dispatcher, if it crashes, we crash too.
- current_actor.link Actor[:dispatcher]
-
- run!
- end
-
- def to_s(client=nil)
- client.nil? ? "server" : "server-#{client}"
- end
-
- # Runs main loop which is responsible for accepting connections.
- def run
- info "Starting main event loop."
- loop do
- socket = @server.accept
- _, port, host = socket.peeraddr
- client = Client.new(host, port)
-
- handle! socket, client
- end
- end
-
- # Clean up upon actor exit.
- def finalize
- @server.close unless @server.nil?
- end
-
- # Handles one client connection.
- def handle(socket, client)
- @sockets[client] = socket
- info "Connected.", to_s(client)
- Actor[:dispatcher].register!(client)
-
- buffer = StreamBuffer.new
-
- loop do
- # Read some data from the socket.
- data = socket.readpartial(4096)
-
- if flash_policy_request?(data)
- info "Policy request got, responding.", to_s(client)
- socket.write(policy_data)
- # Disconnect upon flash policy, otherwise flash client gets stuck.
- socket.close
- return
- else
- buffer.data(data)
- buffer.each_message do |message|
- if message == ""
- socket.write("ERROR: empty message\n")
- socket.close
- return
- end
-
- debug "Received message: \"#{message}\"", to_s(client)
-
- json = begin
- LOGGER.block(
- "Parsing message", :level => :info, :component => to_s(client)
- ) do
- JSON.parse(message)
- end
- rescue JSON::ParserError => e
- info "Cannot parse #{message.inspect} as JSON: #{e}", to_s(client)
- socket.write("ERROR: not JSON\n")
- socket.close
- return
- end
-
- Actor[:dispatcher].receive_message! client, json
- end
- end
- end
- rescue *IO_ERRORS
- # Our client has disconnected.
- ensure
- client_disconnected(client)
- end
-
- # Write _message_ serialized as JSON to socket associated with _client_.
- def write(client, message)
- socket = @sockets[client]
- if socket.nil?
- info "Message write aborted, socket not found: #{message}", to_s(client)
- return
- end
-
- json = begin
- LOGGER.block(
- "Serializing message", :level => :info, :component => to_s(client)
- ) do
- JSON.generate(message)
- end
- rescue Exception => e
- error "Failed while serializing message:\n\n#{
- message.inspect}\n\n#{
- e.to_log_str}",
- to_s(client)
- socket.close
- return
- end
-
- debug "Sending message: #{json}", to_s(client)
- socket.write "#{json}\n"
- rescue *IO_ERRORS
- # Our client has disconnected.
- client_disconnected(client)
- end
-
- def disconnect(client)
- tag = to_s(client)
-
- info "Disconnecting.", tag
-
- socket = @sockets[client]
- if socket
- socket.close unless socket.closed?
- @sockets.delete client
- info "Disconnected.", tag
- else
- info "Already disconnected.", tag
- end
- end
-
- private
- def connected
- @sockets.size
- end
-
- def client_disconnected(client)
- if @sockets.has_key?(client)
- Actor[:dispatcher].unregister!(client)
- disconnect(client)
- end
- end
+ #include NamedLogMessages
+ #include FlashPolicyHandler
+ #include Celluloid::IO
+ #
+ #IO_ERRORS = [EOFError, IOError, Errno::ECONNRESET, Errno::EBADF, Errno::EPIPE]
+ #
+ #def initialize(port)
+ # @server = Celluloid::IO::TCPServer.new("0.0.0.0", port)
+ #
+ # # client -> socket
+ # @sockets = {}
+ #
+ # # We depend on dispatcher, if it crashes, we crash too.
+ # current_actor.link Actor[:dispatcher]
+ #
+ # run!
+ #end
+ #
+ #def to_s(client=nil)
+ # client.nil? ? "server" : "server-#{client}"
+ #end
+ #
+ ## Runs main loop which is responsible for accepting connections.
+ #def run
+ # info "Starting main event loop."
+ # loop do
+ # socket = @server.accept
+ # _, port, host = socket.peeraddr
+ # client = Client.new(host, port)
+ #
+ # handle! socket, client
+ # end
+ #end
+ #
+ ## Clean up upon actor exit.
+ #def finalize
+ # @server.close unless @server.nil?
+ #end
+ #
+ ## Handles one client connection.
+ #def handle(socket, client)
+ # @sockets[client] = socket
+ # info "Connected.", to_s(client)
+ # Actor[:dispatcher].register!(client)
+ #
+ # buffer = StreamBuffer.new
+ #
+ # loop do
+ # # Read some data from the socket.
+ # data = socket.readpartial(4096)
+ #
+ # if flash_policy_request?(data)
+ # info "Policy request got, responding.", to_s(client)
+ # socket.write(policy_data)
+ # # Disconnect upon flash policy, otherwise flash client gets stuck.
+ # socket.close
+ # return
+ # else
+ # buffer.data(data)
+ # buffer.each_message do |message|
+ # if message == ""
+ # socket.write("ERROR: empty message\n")
+ # socket.close
+ # return
+ # end
+ #
+ # debug "Received message: \"#{message}\"", to_s(client)
+ #
+ # json = begin
+ # LOGGER.block(
+ # "Parsing message", :level => :debug, :component => to_s(client)
+ # ) do
+ # JSON.parse(message)
+ # end
+ # rescue JSON::ParserError => e
+ # info "Cannot parse #{message.inspect} as JSON: #{e}", to_s(client)
+ # socket.write("ERROR: not JSON\n")
+ # socket.close
+ # return
+ # end
+ #
+ # Actor[:dispatcher].receive_message! client, json
+ # end
+ # end
+ # end
+ #rescue *IO_ERRORS
+ # # Our client has disconnected.
+ #ensure
+ # client_disconnected(client)
+ #end
+ #
+ ## Write _message_ serialized as JSON to socket associated with _client_.
+ #def write(client, message)
+ # socket = @sockets[client]
+ # if socket.nil?
+ # info "Message write aborted, socket not found: #{message}", to_s(client)
+ # return
+ # end
+ #
+ # json = begin
+ # LOGGER.block(
+ # "Serializing message", :level => :debug, :component => to_s(client)
+ # ) do
+ # JSON.generate(message)
+ # end
+ # rescue Exception => e
+ # error "Failed while serializing message:\n\n#{
+ # message.inspect}\n\n#{
+ # e.to_log_str}",
+ # to_s(client)
+ # socket.close
+ # return
+ # end
+ #
+ # debug "Sending message: #{json}", to_s(client)
+ # socket.write "#{json}\n"
+ #rescue *IO_ERRORS
+ # # Our client has disconnected.
+ # client_disconnected(client)
+ #end
+ #
+ #def disconnect(client)
+ # tag = to_s(client)
+ #
+ # info "Disconnecting.", tag
+ #
+ # socket = @sockets[client]
+ # if socket
+ # socket.close unless socket.closed?
+ # @sockets.delete client
+ # info "Disconnected.", tag
+ # else
+ # info "Already disconnected.", tag
+ # end
+ #end
+ #
+ #private
+ #def connected
+ # @sockets.size
+ #end
+ #
+ #def client_disconnected(client)
+ # if @sockets.has_key?(client)
+ # Actor[:dispatcher].unregister!(client)
+ # disconnect(client)
+ # end
+ #end
end
------------------- server/lib/server/server_actor/client.rb -------------------
index 2b46283..01a9746 100644
@@ -1,12 +1,13 @@
# Represents a connected client.
class ServerActor::Client
- attr_reader :host, :port
+ attr_reader :host, :port, :em_connection
- def initialize(host, port)
+ def initialize(host, port, em_connection)
@host = host
@port = port
+ @em_connection = em_connection
end
def to_s
"#{@host}:#{@port}"
end
--------------------- server/lib/server/server_machine.rb ---------------------
new file mode 100644
index 0000000..82b290b
@@ -0,0 +1,86 @@
+module ServerMachine
+ include NamedLogMessages
+ include FlashPolicyHandler
+
+ def to_s
+ @client.nil? ? "server" : "server-#{@client}"
+ end
+
+ def post_init
+ port, ip = get_client_addr
+ @client = ServerActor::Client.new(ip, port, self)
+ info "Connected."
+ Celluloid::Actor[:dispatcher].register!(@client)
+
+ @buffer = StreamBuffer.new
+ end
+
+ def receive_data(data)
+ if flash_policy_request?(data)
+ info "Policy request got, responding."
+ send_data policy_data
+ # Disconnect upon flash policy, otherwise flash client gets stuck.
+ close_connection_after_writing
+ else
+ @buffer.data(data)
+ @buffer.each_message do |message|
+ if message == ""
+ send_data("ERROR: empty message\n")
+ close_connection_after_writing
+ return
+ end
+
+ debug "Received message: \"#{message}\""
+
+ json = begin
+ LOGGER.block(
+ "Parsing message", :level => :debug, :component => to_s
+ ) do
+ JSON.parse(message)
+ end
+ rescue JSON::ParserError => e
+ info "Cannot parse #{message.inspect} as JSON: #{e}"
+ send_data("ERROR: not JSON\n")
+ close_connection_after_writing
+ return
+ end
+
+ Celluloid::Actor[:dispatcher].receive_message! @client, json
+ end
+ end
+ end
+
+ def write(message)
+ json = begin
+ LOGGER.block(
+ "Serializing message", :level => :debug, :component => to_s
+ ) do
+ JSON.generate(message)
+ end
+ rescue Exception => e
+ error "Failed while serializing message:\n\n#{
+ message.inspect}\n\n#{e.to_log_str}", to_s
+ close_connection_after_writing
+ return
+ end
+
+ debug "Sending message: #{json}", to_s
+ send_data "#{json}\n"
+ end
+
+ def unbind
+ info "Disconnected."
+ Celluloid::Actor[:dispatcher].unregister!(@client)
+ end
+
+ private
+
+ def get_client_addr
+ peername = get_peername
+ if peername
+ Socket.unpack_sockaddr_in(peername)
+ else
+ ["Unknown", "Unknown"]
+ end
+ end
+end
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment