Created
April 2, 2012 04:41
-
-
Save ryanlecompte/2280799 to your computer and use it in GitHub Desktop.
Example of a preforking server that uses threads instead of Kernel#fork
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
require 'thread' | |
# Example of a "threaded preforking" server using multiple acceptor threads | |
# and a pool of worker threads to service the actual requests. Note | |
# that this is similar to a real preforking server that uses Kernel#fork | |
# to service requests concurrently with a single listener socket. The only | |
# difference is that instead of multiple forked processes, we use multiple | |
# acceptor threads. | |
class Server | |
TIMEOUT = 5 | |
def initialize(opts = {}) | |
@port = opts.fetch(:port, 3000) | |
@num_acceptors = opts.fetch(:num_acceptors, 10) | |
@pool = ThreadPool.new(opts.fetch(:num_workers, 10)) | |
@running = false | |
puts "Server created with #{@num_acceptors} acceptors and #{@pool.size} workers." | |
end | |
def run | |
install_signal_handlers | |
run_acceptors | |
end | |
private | |
def run_acceptors | |
@running = true | |
@socket = TCPServer.new(@port) | |
@acceptors = Array.new(@num_acceptors) do | |
Thread.new do | |
while @running | |
if IO.select([@socket], nil, nil, TIMEOUT) | |
begin | |
conn = @socket.accept_nonblock | |
@pool << [conn, proc { |client| handle_client(client) }] | |
rescue IO::WaitReadable, Errno::EINTR | |
rescue => ex | |
puts "Error while accepting client: #{ex.message}" | |
end | |
end | |
end | |
end | |
end | |
@acceptors.each(&:join) | |
end | |
def install_signal_handlers | |
[:INT, :TERM].each do |signal| | |
trap(signal) do | |
shutdown | |
end | |
end | |
end | |
def handle_client(conn) | |
conn.write 'See ya!' | |
rescue => ex | |
puts "Error while writing to client: #{ex.message}" | |
ensure | |
begin | |
conn.close | |
rescue | |
end | |
end | |
def shutdown | |
puts 'Shutting down ...' | |
@running = false | |
@pool.shutdown | |
end | |
end | |
class ThreadPool | |
def initialize(size) | |
@queue = Queue.new | |
@workers = Array.new(size) do | |
Thread.new do | |
while job = @queue.pop | |
client, handler = job | |
handler.call(client) | |
end | |
end | |
end | |
end | |
def <<(job) | |
@queue << job | |
end | |
def shutdown | |
size.times { @queue << nil } | |
@workers.each(&:join) | |
end | |
def size | |
@workers.size | |
end | |
end | |
if __FILE__ == $PROGRAM_NAME | |
Server.new(:port => 3000, :num_acceptors => 50, :num_workers => 200).run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment