Skip to content

Instantly share code, notes, and snippets.

@jhosteny
Created March 7, 2012 21:49
Show Gist options
  • Save jhosteny/1996462 to your computer and use it in GitHub Desktop.
Save jhosteny/1996462 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
require 'celluloid/io'
module Celluloid
# Error that occurs, when we recieve an broken envelope
class BrokenEnvelopeError < Exception; end
# Error that occurs, when the client tries to call an unknown service
class UnknownServiceError < Exception; end
class Mail
class << self
def klass_for(type)
{
CallRequestMail::TYPE => CallRequestMail,
CallResponseMail::TYPE => CallResponseMail,
ExceptionMail::TYPE => ExceptionMail
}[type]
end
def unpack(envelope)
klass = klass_for envelope.type
if klass
klass.new(*Marshal.load(envelope.content))
else
raise BrokenEnvelopeError.new \
"The passed envelope is broken, no (correct) type!"
end
end
end
def encode
Marshal.dump(pickle)
end
end
class CallRequestMail < Mail
TYPE = 1
attr_accessor :tag
attr_accessor :service
attr_accessor :method
attr_accessor :args
def initialize(tag, service, method, args)
@tag = tag
@service = service
@method = method
@args = *args
end
def pickle
[@tag, @service, @method, @args]
end
end
class CallResponseMail < Mail
TYPE = 2
attr_accessor :tag
attr_accessor :result
def initialize(tag, result)
@tag = tag
@result = result
end
def pickle
[@tag, @result]
end
end
class ExceptionMail < Mail
TYPE = 3
attr_accessor :tag
attr_accessor :exception
def initialize(tag, exception)
@tag = tag
@exception = exception
end
def pickle
[@tag, @exception]
end
end
class Envelope
HEADER_SIZE = 5
HEADER_ENCODING = "NC".freeze
# size of the envelope content
attr_reader :size
# the type of mail that is in the envelope
attr_accessor :type
# create a new envelope instance
# @param [String] content the content of the new envelope
# @param [String] type the type of content
def initialize(content = nil, type = nil)
self.content = content
@type = type
end
# resets the envelope object to contain no data, like if it was newly
# created
def reset!
@buffer, @size, @type = "", nil, nil
end
# parses the passed data
# @param [String] data parses the parsed data
# @return [String,nil] returns data that is not part of this string
# (in case the) parser gets more data than the length of the envelope.
# In case there are no data it will return nil.
def parse!(data)
@buffer += data
# parse the length field of the
if @size == nil && @buffer.size >= HEADER_SIZE
parse_header!(@buffer.slice!(0...HEADER_SIZE))
end
# envelope is complete and contains overhang
if @size && @buffer.size > @size
return @buffer.slice!(@size, @buffer.size) # returns the overhang
end
end
# parses the header without having much checking overhead. This is useful
# in situations where we can assure the size beforehand
# @param [String] header the header byte string of size {HEADER_SIZE}
# @api private
def parse_header!(header)
@size, @type = header.unpack(HEADER_ENCODING)
end
# returns the content of the envelope
# @note should only be requested when the message if {finished?}.
# @return [String] the content of the envelope
def content
@buffer
end
# sets the content of the envelope. If `nil` was passed an empty string
# will be set.
# @param [String] data the new content
def content=(content)
@buffer = content || ""
@size = content.nil? ? nil : content.size
end
# encodes the envelope to be send over the wire
# @return [String] encoded envelope
def encode
[@size, @type].pack(HEADER_ENCODING) + @buffer
end
# checks if the complete envelope was allready parsed
# @return [Boolean] `true` if the message was parsed
def finished?
@buffer.size == @size
end
end
module RPC
class NativeClientProxy < BasicObject
# Creates a new native client proxy, where the calls get send to the
# remote side.
# @param [Object] service the service
# @param [NativeClient] client the client to use for communication
def initialize(service, client)
@service, @client = service, client
end
# Handler for calls to the remote system
def method_missing(method, *args, &block)
@client.execute(@service, method, *args)
end
end
# The client that will handle the socket to the remote. The native client
# is written in pure ruby.
class NativeClient
MAIL_KEY = :_mlynml
# Create a native client for the socket.
# @param [Socket] socket the socket to manage
def initialize(socket)
@socket = socket
@semaphore = Mutex.new
@threads = {}
@thread = Thread.new do
# read the answer of the server back in
envelope = Envelope.new
loop do
# read the header to have the size
envelope.parse_header! @socket.read(Envelope::HEADER_SIZE)
# so now that we know the site, read the rest of the envelope
# without parsing
envelope.content = @socket.read(envelope.size)
# returns the result part of the mail or raise the exception if
# there is one
mail = Mail.unpack(envelope)
thread = @semaphore.synchronize { @threads.delete(mail.tag) }
thread[MAIL_KEY] = mail # save the mail for the waiting thread
thread.wakeup # wake up the waiting thread
envelope.reset!
end
end
end
# Disconnect the client from the remote.
def disconnect
@socket.close
end
# Creates a new Proxy Object for the connection.
# @param [Object] service the service
# @return [NativeClientProxy] the proxy obejct that will serve all calls
def for(service)
NativeClientProxy.new(service, self)
end
# Connect to a tcp socket.
# @param [String] host the host to cennect to (e.g. 'localhost')
# @param [Integer] port the port to connect to (e.g. 8000)
# @return [NativeClient] the connected client
def self.connect_tcp(host, port)
new(TCPSocket.open(host, port))
end
# Executes a client call blocking. To issue an async call one needs to
# have start separate threads. The native client uses then multiplexing
# to avoid the other threads blocking.
# @api private
# @param [Object] service the service
# @param [Symbol, String] method the method name to call on the service
# @param [Array<Object>] args the arguments that are passed to the remote
# side
# @return [Object] the result of the call
def execute(service, method, *args)
thread = Thread.current
tag = "#{Time.now.to_f}:#{thread.object_id}"
@semaphore.synchronize {
# since this client can't multiplex, we set the tag to nil
mail = CallRequestMail.new(tag, service, method, args).encode
@socket.write(Envelope.new(mail, CallRequestMail::TYPE).encode)
# lets write our self to the list of waining threads
@threads[tag] = thread
}
# stop the current thread, the thread will be started after the
# response arrived
Thread.stop
# get mail from responses
mail = thread[MAIL_KEY]
if mail.is_a? CallResponseMail
mail.result
else
raise Exception.new # raise exception to capture client backtrace
end
rescue Exception => exception
# add local and remote trace together and reraise original exception
if mail and mail.is_a? ExceptionMail
backtrace = []
backtrace += mail.exception.backtrace
backtrace += ["---"]
backtrace += exception.backtrace
mail.exception.set_backtrace(backtrace)
raise mail.exception
else
raise
end
end
end
module Service
class << self
@@__registry__ = {}
@@__registry_lock__ = Mutex.new
def __register__(name, klass)
@@__registry_lock__.synchronize do
@@__registry__[name] = klass
end
end
def __lookup__(name)
@@__registry_lock__.synchronize do
@@__registry__[name]
end
end
def __registered__
@@__registry_lock__.synchronize do
@@__registry__.keys
end
end
end
module ClassMethods
def register(name)
Service::__register__(name, self)
end
end
def self.included(base)
base.send :include, Celluloid
base.extend ClassMethods
end
class Server
include Celluloid::IO
def initialize(host, port)
@server = TCPServer.new(host, port)
@services = {}
run!
end
def finalize
@server.close if @server
end
def run
loop { handle_connection! @server.accept }
end
def call(envelope)
msg = Mail.unpack(envelope)
tag = msg.tag
result = nil
case msg
when CallRequestMail
if service = @services[msg.service]
elsif klass = Service::__lookup__(msg.service)
service = klass.new
@services[msg.service] = service
else
raise UnknownServiceError.new("Unknown service #{msg.service}")
end
method = msg.method.to_sym
result = service.__send__(method, *msg.args)
else
raise BrokenEnvelopeError.new("Unexpected envelope #{msg.class}")
end
[tag, result]
rescue => exception
[tag, exception]
end
def respond(tag, socket, value)
# For debugging
_, port, host = socket.peeraddr
puts "Responding to call #{tag} from #{host}:#{port} with #{value}"
if value.is_a? Exception
resp = Envelope.new(ExceptionMail.new(tag, value).encode,
ExceptionMail::TYPE).encode
else
resp = Envelope.new(CallResponseMail.new(tag, value).encode,
CallResponseMail::TYPE).encode
end
socket.write resp
end
def receive_data(socket, data, envelope)
rest = envelope.parse!(data)
return unless envelope.finished?
# If our call returns a lambda, then it wants to respond
# asynchronously. The lambda returned will perform the
# work when invoked, and is passed another lambda which
# is a closure over the actor, socket and call id needed
# to actually perform the response when the work is done.
tag, value = call(envelope)
if value.is_a? Proc
proxy = current_actor
resp = lambda do |v|
proxy.respond! tag, socket, v
end
value.call resp
puts "Invoked asynchronous RPC"
else
# The response was ready synchronously, so schedule it now.
puts "Invoked synchronous RPC"
respond! tag, socket, value
end
envelope.reset!
receive_data(socket, rest, envelope) if rest
end
def handle_connection(socket)
_, port, host = socket.peeraddr
puts "*** Received connection from #{host}:#{port}"
envelope = Envelope.new
loop do
receive_data(socket, socket.readpartial(4096), envelope)
end
rescue EOFError
puts "*** #{host}:#{port} disconnected"
socket.close
end
end
end # Service
end # RPC
end # Celluloid
if __FILE__ == $0
# Simple test
class SimpleServer
include Celluloid::RPC::Service
register :foo
def baz(n)
n + 1
end
def qux
puts "qux entered"
lambda do |resp|
puts "sleeping"
after(10) {
puts "responding"
resp.call "Qux!"
}
end
end
end
supervisor = Celluloid::RPC::Service::Server.supervise("127.0.0.1", 1234)
trap("INT") { supervisor.terminate; exit }
client = Celluloid::RPC::NativeClient.connect_tcp("127.0.0.1", 1234)
svc = client.for(:foo)
puts svc.baz 1
svc.qux
puts svc.baz
sleep
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment