Created
January 5, 2011 08:28
-
-
Save pbosetti/766056 to your computer and use it in GitHub Desktop.
Simple InterProcessCommunication library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# test | |
# | |
# Created by Paolo Bosetti on 2011-01-04. | |
# Copyright (c) 2011 University of Trento. All rights reserved. | |
# | |
require "yaml" | |
require "socket" | |
require "timeout" | |
require "fileutils" | |
# Socket Abstraction class: single interface for both unix and UDP sockets. | |
# @author Paolo Bosetti | |
# @todo Add file-based interface | |
class SimpleSocket | |
LOCALHOST = '127.0.0.1' | |
# Initializer | |
# @param [Hash] args a hash of configurayion parameters | |
# @option args [Fixnum] :port udp port | |
# @option args [String] :host host address (name or udp, default to LOCALHOST constant) | |
# @option args [Symbol] :kind type of socket: :unix or :udp | |
# @option args [true,false] :force if true, reopens a Unix socket even if a stale socket file exists | |
def initialize(args = {}) | |
@cfg = {:port => 5000, :host => LOCALHOST, :kind => :unix, :force => true } | |
@cfg.merge! args | |
case @cfg[:kind] | |
when :unix | |
@socket_file = "/tmp/#{$0}.sock" | |
@socket = nil | |
when :udp | |
@socket = UDPSocket.new | |
else | |
raise ArgumentError, "Either :unix or :udp allowed" | |
end | |
@open = false | |
end | |
# Opens the connection. | |
# @return [true,false] false if already open, true otherwise | |
def connect | |
return false if @open | |
case @cfg[:kind] | |
when :unix | |
@socket = UNIXSocket.open(@socket_file) | |
when :udp | |
@socket.connect(@cfg[:host], @cfg[:port]) | |
end | |
@open = true | |
end | |
# Prints a string to the socket | |
# @return [nil] | |
def print(string) | |
@socket.print(string) | |
end | |
# Sets itself in sorver mode and start listening. If @cfg[:force] is true, | |
# removes any existing stale socket file, then retries to open the socket. | |
# @raise [Errno::EADDRINUSE] when the socet is busy | |
def listen | |
case @cfg[:kind] | |
when :unix | |
@socket = UNIXServer.open(@socket_file).accept | |
when :udp | |
@socket.bind(LOCALHOST, @cfg[:port]) | |
end | |
rescue Errno::EADDRINUSE | |
if @cfg[:force] then | |
FileUtils::rm(@socket_file) | |
retry | |
else | |
puts $! | |
end | |
end | |
# Reads from socket, blocking mode. | |
# @param [Fixnum] bytes the number of bytes to be read. | |
# @return [String] the read packet | |
def recvfrom(bytes) | |
@socket.recvfrom(bytes) | |
end | |
# Reads from socket, non-blocking mode. | |
# @param [Fixnum] bytes the number of bytes to be read. | |
# @raise [] | |
# @return [String] the read packet | |
def recv_nonblock(bytes) | |
@socket.recv_nonblock(bytes) | |
end | |
# Closes the socket, removing any possible socket file. | |
# @return [nil] | |
def close | |
@socket.close | |
@open = false | |
FileUtils::rm(@socket_file) if @socket_file | |
end | |
end | |
# Implements a simple inter-process communication. By default, it transfers | |
# objects serialized by YAML, although different serialization methods can | |
# be specified runtime thanks to blocks. | |
# @example Server: | |
# from_client = SimpleIPC.new :port => 5000, :timeout => 0, :kind => :udp | |
# from_client.listen | |
# from_client.get # YAML serialization | |
# from_client.get {|s| s.split(',').map {|e| e.to_f}} # Custom srlzn | |
# from_client.get {|s| s.unpack('N4')} # Binary pack srlzn | |
# from_client.close | |
# @example Client: | |
# to_server = SimpleIPC.new :port => 5000, :kind => :udp | |
# to_server.send([1,2,3,4, "test"]) # YAML serialization | |
# to_server.send([1,2,3,4]) {|o| o * ","} # Custom srlzn | |
# to_server.send([1,2,3,4]) {|o| o.pack("N4")} # Binary pack srlzn | |
# to_server.close | |
# @author Paolo Bosetti | |
class SimpleIPC | |
LENGTH_CODE = 'N' | |
LENGTH_SIZE = [0].pack(LENGTH_CODE).size | |
LOCALHOST = '127.0.0.1' | |
attr_accessor :cfg | |
attr_reader :stream_size | |
# Initializer | |
# @param [Hash] args a hash of configurayion parameters | |
# @option args [Fixnum] :port udp port | |
# @option args [String] :host host address (name or udp, default to LOCALHOST constant) | |
# @option args [Numeric] :timeout timeout to be used in blocking mode | |
# @option args [Symbol] :kind type of socket: :unix or :udp | |
def initialize(args = {}) | |
@cfg = {:port => 5000, :host => LOCALHOST, :timeout => 1.0, :kind => :udp } | |
@cfg.merge! args | |
@stream_size | |
@socket = SimpleSocket.new @cfg | |
end | |
# Sends an object. | |
# @param [Object] something an object to be serialized and sent | |
# @return [String] the serialized payload | |
# @yield [Object] If a block is given, passes the received object to the block as o to be serialized | |
# @yieldparam [Object] o the object to be sent (anything serializable) | |
# @yieldreturn [String] the object serialized as a String | |
def send(something) | |
if block_given? then | |
payload = yield(something) | |
else | |
payload = YAML.dump(something) | |
end | |
length = [payload.size].pack(LENGTH_CODE) | |
@socket.connect | |
@socket.print(length) | |
@socket.print(payload) | |
return payload | |
end | |
# Puts itself in server mode and starts listening | |
def listen | |
@socket.listen | |
end | |
# When in server mode, call this method to read a message. If no block is | |
# given, it assumes that the object was passed as serialized by YAML. | |
# Otherwise a deserialization block must be provided. | |
# @return [Object,nil] the deserialized Object, or nil in case of timeout. | |
# @yield [String] the block must deserialize the String and return an Object | |
def get | |
result = nil | |
begin | |
if @cfg[:timeout] > 0 and !@cfg[:nonblock] then | |
Timeout::timeout(@cfg[:timeout]) do |to| | |
result = get_ | |
end | |
else | |
result = get_ | |
end | |
rescue Timeout::Error | |
result = nil | |
end | |
if block_given? then | |
return yield(result) | |
else | |
return YAML.load(result) | |
end | |
rescue Errno::EAGAIN | |
return nil | |
end | |
# Closes the socket. | |
def close | |
@socket.close | |
end | |
private | |
def get_ | |
if @cfg[:nonblock] then | |
msg, sender = @socket.recv_nonblock(LENGTH_SIZE) | |
return nil if msg == "" | |
else | |
msg, sender = @socket.recvfrom(LENGTH_SIZE) | |
end | |
@stream_size = msg.unpack(LENGTH_CODE)[0] | |
msg, sender = @socket.recvfrom(@stream_size) | |
return msg | |
end | |
end | |
if __FILE__ == $0 then | |
class Array | |
def mean | |
self.inject {|sum,i| sum + i} / size | |
end | |
def sd | |
m = self.mean | |
v = self.inject(0.0) {|sum,i| sum + (i - m)**2} | |
Math::sqrt(v / (size - 1.0)) | |
end | |
end | |
if ARGV[0] == "server" then | |
require "pp" | |
from_client = SimpleIPC.new :port => 5000, :timeout => 0, :kind => :udp | |
from_client.listen | |
from_client.cfg[:nonblock] = false | |
now = 0.0 | |
data = {id:[], time: [], bytes: [], sent: [], cont:[]} | |
while true do | |
result = from_client.get | |
break if result == "stop" | |
data[:time] << Time.now | |
data[:sent] << result[1] | |
data[:id] << result[0] | |
data[:cont] << result[2] | |
data[:bytes] << from_client.stream_size | |
end | |
duration = Time.now - data[:sent][0] | |
transferred = data[:bytes].inject {|b,sum| sum + b} | |
latency = [] | |
data[:time].each_with_index do |t,i| | |
latency << (data[:time][i] - data[:sent][i]) * 1000 | |
puts "%4d: %10.5f -> %10.5f = %9.3fms (%d bytes)" % [ | |
data[:id][i], | |
data[:sent][i] - data[:sent][0], | |
data[:time][i] - data[:sent][0], | |
latency[i], | |
data[:bytes][i] | |
] | |
end | |
puts | |
puts "transferred %d bytes in %12.6f s @ %d kbps" % [ | |
transferred, | |
duration, | |
(transferred / duration * 8 / 1000).to_i | |
] | |
puts "average latency %9.3f ms, st.dev %9.3f ms" % [ | |
latency.mean, | |
latency.sd | |
] | |
puts "minimum latency %9.3f ms, maximum latency %9.3f ms" % [ | |
latency.min, | |
latency.max | |
] | |
else | |
to_server = SimpleIPC.new :port => 5000, :kind => :udp | |
n = ARGV[0].to_i | |
n = 10 if n <= 0 | |
now = 0.0 | |
n.times do |i| | |
to_server.send([i,Time.now,"test" * (rand(100)+1)]) | |
end | |
to_server.send("stop") | |
to_server.close | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment