Created
March 7, 2012 21:49
-
-
Save jhosteny/1996462 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
require 'celluloid/io' | |
module Celluloid | |
# Error that occurs, when we recieve an broken envelope | |
class BrokenEnvelopeError < Exception; end | |
# Error that occurs, when the client tries to call an unknown service | |
class UnknownServiceError < Exception; end | |
class Mail | |
class << self | |
def klass_for(type) | |
{ | |
CallRequestMail::TYPE => CallRequestMail, | |
CallResponseMail::TYPE => CallResponseMail, | |
ExceptionMail::TYPE => ExceptionMail | |
}[type] | |
end | |
def unpack(envelope) | |
klass = klass_for envelope.type | |
if klass | |
klass.new(*Marshal.load(envelope.content)) | |
else | |
raise BrokenEnvelopeError.new \ | |
"The passed envelope is broken, no (correct) type!" | |
end | |
end | |
end | |
def encode | |
Marshal.dump(pickle) | |
end | |
end | |
class CallRequestMail < Mail | |
TYPE = 1 | |
attr_accessor :tag | |
attr_accessor :service | |
attr_accessor :method | |
attr_accessor :args | |
def initialize(tag, service, method, args) | |
@tag = tag | |
@service = service | |
@method = method | |
@args = *args | |
end | |
def pickle | |
[@tag, @service, @method, @args] | |
end | |
end | |
class CallResponseMail < Mail | |
TYPE = 2 | |
attr_accessor :tag | |
attr_accessor :result | |
def initialize(tag, result) | |
@tag = tag | |
@result = result | |
end | |
def pickle | |
[@tag, @result] | |
end | |
end | |
class ExceptionMail < Mail | |
TYPE = 3 | |
attr_accessor :tag | |
attr_accessor :exception | |
def initialize(tag, exception) | |
@tag = tag | |
@exception = exception | |
end | |
def pickle | |
[@tag, @exception] | |
end | |
end | |
class Envelope | |
HEADER_SIZE = 5 | |
HEADER_ENCODING = "NC".freeze | |
# size of the envelope content | |
attr_reader :size | |
# the type of mail that is in the envelope | |
attr_accessor :type | |
# create a new envelope instance | |
# @param [String] content the content of the new envelope | |
# @param [String] type the type of content | |
def initialize(content = nil, type = nil) | |
self.content = content | |
@type = type | |
end | |
# resets the envelope object to contain no data, like if it was newly | |
# created | |
def reset! | |
@buffer, @size, @type = "", nil, nil | |
end | |
# parses the passed data | |
# @param [String] data parses the parsed data | |
# @return [String,nil] returns data that is not part of this string | |
# (in case the) parser gets more data than the length of the envelope. | |
# In case there are no data it will return nil. | |
def parse!(data) | |
@buffer += data | |
# parse the length field of the | |
if @size == nil && @buffer.size >= HEADER_SIZE | |
parse_header!(@buffer.slice!(0...HEADER_SIZE)) | |
end | |
# envelope is complete and contains overhang | |
if @size && @buffer.size > @size | |
return @buffer.slice!(@size, @buffer.size) # returns the overhang | |
end | |
end | |
# parses the header without having much checking overhead. This is useful | |
# in situations where we can assure the size beforehand | |
# @param [String] header the header byte string of size {HEADER_SIZE} | |
# @api private | |
def parse_header!(header) | |
@size, @type = header.unpack(HEADER_ENCODING) | |
end | |
# returns the content of the envelope | |
# @note should only be requested when the message if {finished?}. | |
# @return [String] the content of the envelope | |
def content | |
@buffer | |
end | |
# sets the content of the envelope. If `nil` was passed an empty string | |
# will be set. | |
# @param [String] data the new content | |
def content=(content) | |
@buffer = content || "" | |
@size = content.nil? ? nil : content.size | |
end | |
# encodes the envelope to be send over the wire | |
# @return [String] encoded envelope | |
def encode | |
[@size, @type].pack(HEADER_ENCODING) + @buffer | |
end | |
# checks if the complete envelope was allready parsed | |
# @return [Boolean] `true` if the message was parsed | |
def finished? | |
@buffer.size == @size | |
end | |
end | |
module RPC | |
class NativeClientProxy < BasicObject | |
# Creates a new native client proxy, where the calls get send to the | |
# remote side. | |
# @param [Object] service the service | |
# @param [NativeClient] client the client to use for communication | |
def initialize(service, client) | |
@service, @client = service, client | |
end | |
# Handler for calls to the remote system | |
def method_missing(method, *args, &block) | |
@client.execute(@service, method, *args) | |
end | |
end | |
# The client that will handle the socket to the remote. The native client | |
# is written in pure ruby. | |
class NativeClient | |
MAIL_KEY = :_mlynml | |
# Create a native client for the socket. | |
# @param [Socket] socket the socket to manage | |
def initialize(socket) | |
@socket = socket | |
@semaphore = Mutex.new | |
@threads = {} | |
@thread = Thread.new do | |
# read the answer of the server back in | |
envelope = Envelope.new | |
loop do | |
# read the header to have the size | |
envelope.parse_header! @socket.read(Envelope::HEADER_SIZE) | |
# so now that we know the site, read the rest of the envelope | |
# without parsing | |
envelope.content = @socket.read(envelope.size) | |
# returns the result part of the mail or raise the exception if | |
# there is one | |
mail = Mail.unpack(envelope) | |
thread = @semaphore.synchronize { @threads.delete(mail.tag) } | |
thread[MAIL_KEY] = mail # save the mail for the waiting thread | |
thread.wakeup # wake up the waiting thread | |
envelope.reset! | |
end | |
end | |
end | |
# Disconnect the client from the remote. | |
def disconnect | |
@socket.close | |
end | |
# Creates a new Proxy Object for the connection. | |
# @param [Object] service the service | |
# @return [NativeClientProxy] the proxy obejct that will serve all calls | |
def for(service) | |
NativeClientProxy.new(service, self) | |
end | |
# Connect to a tcp socket. | |
# @param [String] host the host to cennect to (e.g. 'localhost') | |
# @param [Integer] port the port to connect to (e.g. 8000) | |
# @return [NativeClient] the connected client | |
def self.connect_tcp(host, port) | |
new(TCPSocket.open(host, port)) | |
end | |
# Executes a client call blocking. To issue an async call one needs to | |
# have start separate threads. The native client uses then multiplexing | |
# to avoid the other threads blocking. | |
# @api private | |
# @param [Object] service the service | |
# @param [Symbol, String] method the method name to call on the service | |
# @param [Array<Object>] args the arguments that are passed to the remote | |
# side | |
# @return [Object] the result of the call | |
def execute(service, method, *args) | |
thread = Thread.current | |
tag = "#{Time.now.to_f}:#{thread.object_id}" | |
@semaphore.synchronize { | |
# since this client can't multiplex, we set the tag to nil | |
mail = CallRequestMail.new(tag, service, method, args).encode | |
@socket.write(Envelope.new(mail, CallRequestMail::TYPE).encode) | |
# lets write our self to the list of waining threads | |
@threads[tag] = thread | |
} | |
# stop the current thread, the thread will be started after the | |
# response arrived | |
Thread.stop | |
# get mail from responses | |
mail = thread[MAIL_KEY] | |
if mail.is_a? CallResponseMail | |
mail.result | |
else | |
raise Exception.new # raise exception to capture client backtrace | |
end | |
rescue Exception => exception | |
# add local and remote trace together and reraise original exception | |
if mail and mail.is_a? ExceptionMail | |
backtrace = [] | |
backtrace += mail.exception.backtrace | |
backtrace += ["---"] | |
backtrace += exception.backtrace | |
mail.exception.set_backtrace(backtrace) | |
raise mail.exception | |
else | |
raise | |
end | |
end | |
end | |
module Service | |
class << self | |
@@__registry__ = {} | |
@@__registry_lock__ = Mutex.new | |
def __register__(name, klass) | |
@@__registry_lock__.synchronize do | |
@@__registry__[name] = klass | |
end | |
end | |
def __lookup__(name) | |
@@__registry_lock__.synchronize do | |
@@__registry__[name] | |
end | |
end | |
def __registered__ | |
@@__registry_lock__.synchronize do | |
@@__registry__.keys | |
end | |
end | |
end | |
module ClassMethods | |
def register(name) | |
Service::__register__(name, self) | |
end | |
end | |
def self.included(base) | |
base.send :include, Celluloid | |
base.extend ClassMethods | |
end | |
class Server | |
include Celluloid::IO | |
def initialize(host, port) | |
@server = TCPServer.new(host, port) | |
@services = {} | |
run! | |
end | |
def finalize | |
@server.close if @server | |
end | |
def run | |
loop { handle_connection! @server.accept } | |
end | |
def call(envelope) | |
msg = Mail.unpack(envelope) | |
tag = msg.tag | |
result = nil | |
case msg | |
when CallRequestMail | |
if service = @services[msg.service] | |
elsif klass = Service::__lookup__(msg.service) | |
service = klass.new | |
@services[msg.service] = service | |
else | |
raise UnknownServiceError.new("Unknown service #{msg.service}") | |
end | |
method = msg.method.to_sym | |
result = service.__send__(method, *msg.args) | |
else | |
raise BrokenEnvelopeError.new("Unexpected envelope #{msg.class}") | |
end | |
[tag, result] | |
rescue => exception | |
[tag, exception] | |
end | |
def respond(tag, socket, value) | |
# For debugging | |
_, port, host = socket.peeraddr | |
puts "Responding to call #{tag} from #{host}:#{port} with #{value}" | |
if value.is_a? Exception | |
resp = Envelope.new(ExceptionMail.new(tag, value).encode, | |
ExceptionMail::TYPE).encode | |
else | |
resp = Envelope.new(CallResponseMail.new(tag, value).encode, | |
CallResponseMail::TYPE).encode | |
end | |
socket.write resp | |
end | |
def receive_data(socket, data, envelope) | |
rest = envelope.parse!(data) | |
return unless envelope.finished? | |
# If our call returns a lambda, then it wants to respond | |
# asynchronously. The lambda returned will perform the | |
# work when invoked, and is passed another lambda which | |
# is a closure over the actor, socket and call id needed | |
# to actually perform the response when the work is done. | |
tag, value = call(envelope) | |
if value.is_a? Proc | |
proxy = current_actor | |
resp = lambda do |v| | |
proxy.respond! tag, socket, v | |
end | |
value.call resp | |
puts "Invoked asynchronous RPC" | |
else | |
# The response was ready synchronously, so schedule it now. | |
puts "Invoked synchronous RPC" | |
respond! tag, socket, value | |
end | |
envelope.reset! | |
receive_data(socket, rest, envelope) if rest | |
end | |
def handle_connection(socket) | |
_, port, host = socket.peeraddr | |
puts "*** Received connection from #{host}:#{port}" | |
envelope = Envelope.new | |
loop do | |
receive_data(socket, socket.readpartial(4096), envelope) | |
end | |
rescue EOFError | |
puts "*** #{host}:#{port} disconnected" | |
socket.close | |
end | |
end | |
end # Service | |
end # RPC | |
end # Celluloid | |
if __FILE__ == $0 | |
# Simple test | |
class SimpleServer | |
include Celluloid::RPC::Service | |
register :foo | |
def baz(n) | |
n + 1 | |
end | |
def qux | |
puts "qux entered" | |
lambda do |resp| | |
puts "sleeping" | |
after(10) { | |
puts "responding" | |
resp.call "Qux!" | |
} | |
end | |
end | |
end | |
supervisor = Celluloid::RPC::Service::Server.supervise("127.0.0.1", 1234) | |
trap("INT") { supervisor.terminate; exit } | |
client = Celluloid::RPC::NativeClient.connect_tcp("127.0.0.1", 1234) | |
svc = client.for(:foo) | |
puts svc.baz 1 | |
svc.qux | |
puts svc.baz | |
sleep | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment