Created
August 18, 2014 12:40
-
-
Save Asmod4n/81109b231fed68a9add7 to your computer and use it in GitHub Desktop.
First stab at a new high level czmq wrapper in ruby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/setup' | |
require 'msgpack' | |
require 'concurrent' | |
Concurrent::Actor.i_know_it_is_experimental! | |
logger = Logger.new($stderr) | |
Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| | |
logger.add level, message, progname, &block | |
end | |
require_relative 'ffi-czmq' | |
CZMQ.init | |
ClientResponse = Struct.new(:uuid, :answer) | |
ClientRequest = Struct.new(:uuid, :question) do | |
def send_over(socket, type) | |
request = CZMQ::Zmsg.new | |
request << '' | |
request << type | |
request << uuid | |
request << MessagePack.dump(question) | |
request.send_zmsg(socket) | |
end | |
def get_answer_from(socket) | |
MessagePack.load(CZMQ::Zstr.new(socket).recv) | |
end | |
end | |
ServerRequest = Struct.new(:identities, :uuid, :question) | |
ServerResponse = Struct.new(:identities, :uuid, :answer) do | |
def send_over(socket, type) | |
response = CZMQ::Zmsg.new | |
identities.each do |identity| | |
response << identity | |
end | |
response << '' | |
response << type | |
response << uuid | |
response << MessagePack.dump(answer) | |
response.send_zmsg(socket) | |
end | |
end | |
class ServerWorker < CZMQ::Zthread | |
attr_reader :parent_pipe, :sender | |
def initialize(parent) | |
@parent = parent | |
@parent_pipe = fork_callback(nil, &method(:handle_spawn)) | |
end | |
private | |
def handle_spawn(*args, zctx, child_pipe) | |
@sender = Thread.current.to_s | |
@zloop = CZMQ::Zloop.new | |
#@zloop.set_verbose(true) | |
@worker = CZMQ::Zsocket.new(zctx, CZMQ::Zsocket::DEALER) | |
cpi = child_pipe.to_pollitem | |
cpi_err = child_pipe.to_pollitem(CZMQ::Zpoller::POLLERR) | |
@zloop.set_tolerant(cpi) | |
@zloop.add_poller(cpi, nil) do |zloop, pollitem, *args| | |
CZMQ::Zmsg.recv_nowait(pollitem[:socket]).send_zmsg(@worker) | |
0 | |
end | |
@zloop.add_poller(cpi_err, nil) do |zloop, pollitem, *args| | |
zloop.destructor | |
0 | |
end | |
wpi = @worker.to_pollitem | |
wpi_err = @worker.to_pollitem(CZMQ::Zpoller::POLLERR) | |
@zloop.set_tolerant(wpi) | |
@zloop.add_poller(wpi, nil) do |zloop, pollitem, *args| | |
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket]).to_a | |
delimiter = msg.index('') | |
identities = msg[0, delimiter] | |
payload = msg[delimiter +1..-1] | |
type, uuid, question = payload | |
request = ::ServerRequest.new(identities, uuid, MessagePack.load(question)) | |
case type | |
when 'ask' | |
answer = @parent.ask(request).value | |
response = ::ServerResponse.new(identities, uuid, answer) | |
response.send_over(pollitem[:socket], type) | |
when 'tell' | |
@parent << request | |
end | |
0 | |
end | |
@zloop.add_poller(wpi_err, nil) do |zloop, pollitem, *args| | |
@zloop.destructor | |
0 | |
end | |
@worker.connect('inproc://backend') | |
child_pipe.signal | |
@zloop.start | |
@parent << :goodbye | |
Thread.current.exit | |
end | |
end | |
class ClientWorker < CZMQ::Zthread | |
attr_reader :parent_pipe, :sender | |
def initialize(parent, endpoint) | |
@parent = parent | |
@endpoint = endpoint | |
@parent_pipe = fork_callback(nil, &method(:handle_spawn)) | |
end | |
private | |
def handle_spawn(*args, zctx, child_pipe) | |
@sender = Thread.current.to_s | |
@zloop = CZMQ::Zloop.new | |
#@zloop.set_verbose(true) | |
@worker = CZMQ::Zsocket.new(zctx, CZMQ::Zsocket::DEALER) | |
cpi = child_pipe.to_pollitem | |
cpi_err = child_pipe.to_pollitem(CZMQ::Zpoller::POLLERR) | |
@zloop.set_tolerant(cpi) | |
@zloop.add_poller(cpi, nil) do |zloop, pollitem, *args| | |
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket]) | |
type, uuid, question = msg.to_a[1..-1] | |
if type == 'ask' | |
@uuid = uuid | |
end | |
msg.send_zmsg(@worker) | |
0 | |
end | |
@zloop.add_poller(cpi_err, nil) do |zloop, pollitem, *args| | |
@zloop.destructor | |
0 | |
end | |
wpi = @worker.to_pollitem | |
wpi_err = @worker.to_pollitem(CZMQ::Zpoller::POLLERR) | |
@zloop.set_tolerant(wpi) | |
@zloop.add_poller(wpi, nil) do |zloop, pollitem, *args| | |
msg = CZMQ::Zmsg.recv_nowait(pollitem[:socket]).to_a | |
delimiter = msg.index('') | |
identities = msg[0, delimiter] | |
payload = msg[delimiter + 1..-1] | |
type, uuid, answer = payload | |
case type | |
when 'ask' | |
if uuid == @uuid | |
@uuid = nil | |
CZMQ::Zstr.new(child_pipe).send_zstr(answer) | |
end | |
when 'tell' | |
@parent << ::ClientResponse.new(uuid, MessagePack.load(answer)) | |
end | |
0 | |
end | |
@zloop.add_poller(wpi_err, nil) do |zloop, pollitem, *args| | |
@zloop.destructor | |
0 | |
end | |
@worker.connect(@endpoint) | |
child_pipe.signal | |
@zloop.start | |
@parent << :goodbye | |
Thread.current.exit | |
end | |
end | |
class Server < Concurrent::Actor::Context | |
include Concurrent::Logging | |
def initialize | |
@pipes = {} | |
if ((cpu_counter = (Concurrent.processor_count - 1))) > 0 | |
cpu_counter.times do | |
spawn_ioactor | |
end | |
else | |
spawn_ioactor | |
end | |
end | |
def spawn_ioactor | |
worker = ServerWorker.new(ref) | |
timeout(5) { | |
worker.parent_pipe.wait | |
} | |
@pipes[worker.sender] = worker.parent_pipe | |
end | |
def on_message(msg) | |
case msg | |
when :goodbye | |
@pipes.delete(envelope.sender_path) | |
when ::ServerRequest | |
case msg[:question] | |
when 'hello' | |
if envelope.ivar | |
'welcome' | |
else | |
response = ::ServerResponse.new(msg[:identities], msg[:uuid], 'welcome') | |
response.send_over(@pipes[envelope.sender_path], 'tell') | |
end | |
end | |
end | |
end | |
end | |
class Client < Concurrent::Actor::Context | |
include Concurrent::Logging | |
def initialize(endpoint) | |
@endpoint = endpoint | |
spawn_ioactor | |
end | |
def on_message(msg) | |
case msg | |
when :goodbye | |
@worker_pipe = nil | |
when :endpoint | |
@endpoint | |
when ::ClientRequest | |
if envelope.ivar | |
msg.send_over(@worker_pipe, 'ask') | |
msg.get_answer_from(@worker_pipe) | |
else | |
msg.send_over(@worker_pipe, 'tell') | |
end | |
when ::ClientResponse | |
puts msg[:answer] | |
end | |
end | |
private | |
def spawn_ioactor | |
worker = ClientWorker.new(ref, @endpoint) | |
timeout(5) { | |
worker.parent_pipe.wait | |
} | |
@worker_pipe = worker.parent_pipe | |
end | |
end | |
backend = CZMQ::Zsocket.new(CZMQ.context, CZMQ::Zsocket::DEALER) | |
backend.bind('inproc://backend') | |
frontend = CZMQ::Zsocket.new(CZMQ.context, CZMQ::Zsocket::ROUTER) | |
port = frontend.bind('tcp://127.0.0.1:*') | |
proxy = CZMQ::Zproxy.new(CZMQ.context, frontend.to_zsocket, backend.to_zsocket) | |
server = Server.spawn(:server) | |
client = Client.spawn(:client, "tcp://localhost:#{port}") | |
puts client.ask(ClientRequest.new(CZMQ::Zuuid.new, "hello")).value | |
client << ClientRequest.new(CZMQ::Zuuid.new, "hello") | |
sleep |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'ffi' | |
module CZMQ | |
module Utils | |
extend FFI::Library | |
ffi_lib 'czmq', 'libzmq', FFI::Library::LIBC | |
attach_function :zstr_free, :zstr_free, [:pointer], :void, :blocking => true | |
attach_function :libc_free, :free, [:pointer], :void, :blocking => true | |
attach_function :zmq_version, :zmq_version, [:pointer, :pointer, :pointer], :void, :blocking => true | |
attach_function :czmq_version, :zsys_version, [:pointer, :pointer, :pointer], :void, :blocking => true | |
attach_function :errno, :zmq_errno, [], :int, :blocking => true | |
attach_function :strerror, :zmq_strerror, [:int], :string, :blocking => true | |
class << self | |
def version | |
unless @version | |
z_major = FFI::MemoryPointer.new :int | |
z_minor = FFI::MemoryPointer.new :int | |
z_patch = FFI::MemoryPointer.new :int | |
c_major = FFI::MemoryPointer.new :int | |
c_minor = FFI::MemoryPointer.new :int | |
c_patch = FFI::MemoryPointer.new :int | |
zmq_version z_major, z_minor, z_patch | |
czmq_version c_major, c_minor, c_patch | |
@version = { | |
zmq: {:major => z_major.read_int, :minor => z_minor.read_int, :patch => z_patch.read_int}, | |
czmq: {:major => c_major.read_int, :minor => c_minor.read_int, :patch => c_patch.read_int} | |
} | |
end | |
@version | |
end | |
def check_for_pointer(ptr) | |
ptr.is_a?(FFI::Pointer) && !ptr.null? | |
end | |
def read_string(ffi_str) | |
if check_for_pointer(ffi_str) | |
ffi_str_pointer = FFI::MemoryPointer.new(:pointer) | |
ffi_str_pointer.write_pointer(ffi_str) | |
str = ffi_str.read_string | |
zstr_free(ffi_str_pointer) | |
str | |
else | |
fail IOError, error | |
end | |
end | |
def free(pointer) | |
if check_for_pointer(pointer) | |
libc_free(pointer) | |
else | |
fail ArgumentError, 'Can only free non NULL Pointers' | |
end | |
end | |
def error | |
strerror(errno) | |
end | |
end | |
if (version[:zmq][:major] < 4) ||(version[:czmq][:major] < 2) | |
fail LoadError, 'This needs at least zeromq 4 and czmq 2' | |
end | |
end | |
end | |
module LibCZMQ | |
def self.extended(klass) | |
klass.extend FFI::Library | |
klass.ffi_lib('czmq', 'libzmq') | |
end | |
def czmq_class | |
return @czmq_class if @czmq_class | |
@czmq_class = "#{self.singleton_class.inspect.split('::').last[0...-1].downcase}".to_sym | |
end | |
def czmq_constructor(constructor_params = [], constructor_extras = nil) | |
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
attach_function :constructor, "#{czmq_class}_new", #{constructor_params.inspect}, :pointer, :blocking => true | |
def self.new_from_czmq_obj(czmq_obj, owned_by_me = true) | |
if CZMQ::Utils.check_for_pointer(czmq_obj) | |
instance = allocate | |
instance.instance_variable_set(:@czmq_obj, czmq_obj) | |
instance.instance_variable_set(:@owned_by_me, owned_by_me) | |
instance.send :setup_finalizer | |
if instance.method(:initialize).parameters.size > 0 | |
instance.send :initialize, czmq_obj | |
else | |
instance.send :initialize | |
end | |
instance | |
else | |
fail ArgumentError, "Didn't pass a valid Pointer" | |
end | |
end | |
def self.new(*args) | |
instance = allocate | |
if #{constructor_extras.nil?} | |
if #{constructor_params.empty?} | |
czmq_obj = instance.class.constructor | |
else | |
czmq_obj = instance.class.constructor(*args) | |
end | |
else | |
if args.first.respond_to?(:"to_#{constructor_extras}") && | |
CZMQ::Utils.check_for_pointer(args.first.to_#{constructor_extras}) | |
instance.instance_variable_set("@#{constructor_extras}", args.first) | |
else | |
fail ArgumentError, "Didn't pass a valid Pointer" | |
end | |
if #{constructor_params.length} == 1 | |
czmq_obj = instance.class.constructor(args.first.to_czmq) | |
else | |
czmq_obj = instance.class.constructor(args.first.to_czmq, *args[1..-1]) | |
end | |
end | |
!czmq_obj.null? ||fail("Cannot allocate #{czmq_class} because of #{CZMQ::Utils.error}") | |
instance.instance_variable_set(:@czmq_obj, czmq_obj) | |
instance.instance_variable_set(:@owned_by_me, true) | |
instance.send :setup_finalizer | |
if instance.method(:initialize).parameters.size > 0 | |
instance.send :initialize, *args | |
else | |
instance.send :initialize | |
end | |
instance | |
end | |
def to_#{czmq_class} | |
@czmq_obj | |
end | |
def to_czmq | |
@czmq_obj | |
end | |
RUBY | |
end | |
def czmq_destructor(destructor_extras = nil, attach_destructor = true) | |
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
if #{attach_destructor} | |
if #{destructor_extras.nil?} | |
attach_function :destructor, "#{czmq_class}_destroy", [:pointer], :void, :blocking => true | |
else | |
attach_function :destructor, "#{czmq_class}_destroy", [:pointer, :pointer], :void, :blocking => true | |
end | |
end | |
def destructor | |
Thread.exclusive do | |
remove_finalizer | |
if @owned_by_me | |
unless CZMQ::Zctx.interrupted == 1 | |
if #{destructor_extras.nil?} | |
if CZMQ::Utils.check_for_pointer(@czmq_obj) | |
FFI::MemoryPointer.new(:pointer) do |p| | |
p.write_pointer(@czmq_obj) | |
self.class.destructor(p) | |
end | |
end | |
else | |
begin | |
extras = instance_variable_get("@#{destructor_extras}").to_czmq | |
if CZMQ::Utils.check_for_pointer(@czmq_obj) && | |
CZMQ::Utils.check_for_pointer(extras) | |
self.class.destructor(extras, @czmq_obj) | |
end | |
rescue NoMethodError | |
end | |
end | |
end | |
end | |
instance_variables.each do |var| | |
instance_variable_set(var, nil) | |
end | |
true | |
end | |
end | |
private | |
def setup_finalizer | |
ObjectSpace.define_finalizer(self, self.class.close_instance(self)) | |
end | |
def remove_finalizer | |
ObjectSpace.undefine_finalizer self | |
end | |
def self.close_instance(selfie) | |
Proc.new do | |
selfie.destructor | |
end | |
end | |
RUBY | |
end | |
def czmq_function(name, function, arguments, returns) | |
fn = "#{czmq_class}_#{function}" | |
self.module_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
attach_function #{name.inspect}, #{fn.inspect}, #{arguments.inspect}, #{returns.inspect}, :blocking => true | |
def #{name}(*args) | |
if instance_variables.include?(:@czmq_obj) && @czmq_obj.nil? | |
fail "#{czmq_class} is not initialized" | |
end | |
if CZMQ::Utils.check_for_pointer(@czmq_obj) | |
case #{function.inspect} | |
when :send | |
z_obj = @owned_by_me ? self : dup | |
pointer = FFI::MemoryPointer.new(:pointer) | |
pointer.write_pointer(z_obj.to_czmq) | |
zsocket = CZMQ::Zsocket.convert(args.first) | |
result = self.class.#{name}(pointer, zsocket, *args[1..-1]) | |
z_obj.instance_variable_set(:@owned_by_me, false) | |
z_obj.destructor | |
when :insert, :append, :prepend | |
if args.first.respond_to?(:to_czmq) && | |
CZMQ::Utils.check_for_pointer(args.first.to_czmq) | |
if args.first.instance_variable_get(:@owned_by_me) | |
z_obj = args.first | |
else | |
z_obj = args.first.dup | |
end | |
czmq_obj = FFI::MemoryPointer.new(:pointer) | |
czmq_obj.write_pointer(z_obj.to_czmq) | |
self.class.#{name}(@czmq_obj, czmq_obj) | |
z_obj.instance_variable_set(:@owned_by_me, false) | |
z_obj.destructor | |
else | |
fail ArgumentError, "Didn't supply a valid Pointer" | |
end | |
else | |
result = self.class.#{name}(@czmq_obj, *args) | |
end | |
else | |
result = self.class.#{name}(*args) | |
end | |
case #{returns.inspect} | |
when :pointer | |
unless CZMQ::Zctx.interrupted == 1 | |
!result.null? ||fail(CZMQ::Utils.error) | |
end | |
result | |
when :int | |
unless CZMQ::Zctx.interrupted == 1 | |
if #{function.inspect} == :send | |
result != -1 ||fail(IOError, CZMQ::Utils.error) | |
else | |
result != -1 ||fail(CZMQ::Utils.error) | |
end | |
end | |
result | |
when :string, :bool, :char | |
result | |
when :void | |
true | |
else | |
result | |
end | |
end | |
RUBY | |
end | |
end | |
module CZMQ | |
class Zauth | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zauth.txt | |
ALLOW_ANY = '*'.freeze | |
extend ::LibCZMQ | |
czmq_constructor [:pointer], :zctx | |
czmq_destructor | |
czmq_function :allow, :allow, [:pointer, :string], :void | |
czmq_function :deny, :deny, [:pointer, :string], :void | |
czmq_function :configure_plain, :configure_plain, [:pointer, :string, :string], :void | |
czmq_function :configure_curve, :configure_curve, [:pointer, :string, :string], :void | |
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void | |
end | |
end | |
module CZMQ | |
class Zcert | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zcert.txt | |
extend ::LibCZMQ | |
czmq_constructor | |
czmq_destructor | |
#czmq_function :new_from, :new_from, [:pointer, :pointer], :pointer | |
czmq_function :public_txt, :public_txt, [:pointer], :string | |
czmq_function :secret_txt, :secret_txt, [:pointer], :string | |
czmq_function :load_zcert, :load, [:string], :pointer | |
czmq_function :save, :save, [:pointer, :string], :int | |
czmq_function :save_public, :save_public, [:pointer, :string], :int | |
czmq_function :save_secret, :save_secret, [:pointer, :string], :int | |
czmq_function :apply, :apply, [:pointer, :pointer], :void | |
czmq_function :dup_zcert, :dup, [:pointer], :pointer | |
czmq_function :eq, :eq, [:pointer, :pointer], :bool | |
# def self.zcert_new_from(public_key, secret_key) | |
# if @zcert = super | |
# setup_finalizer | |
# else | |
# fail 'Cannot create certificate' | |
# end | |
# end | |
def self.convert(cert) | |
if Utils.check_for_pointer(cert) | |
return cert | |
elsif cert.respond_to?(:to_zcert) && | |
Utils.check_for_pointer(cert.to_zcert) | |
return cert.to_zcert | |
else | |
fail ArgumentError, "#{cert.class} is not a CZMQ::Zcert" | |
end | |
end | |
def self.load(filename) | |
unless (zcert = load_zcert(filename)).null? | |
new_from_czmq_obj(zcert) | |
else | |
fail IOError, CZMQ::Utils.error | |
end | |
end | |
def dup | |
self.class.new_from_czmq_obj(dup_zcert) | |
end | |
def ==(other) | |
eq(self.class.convert(other)) | |
end | |
end | |
end | |
module CZMQ | |
class Zcertstore | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zcertstore.txt | |
extend ::LibCZMQ | |
czmq_constructor [:string] | |
czmq_destructor | |
czmq_function :lookup, :lookup, [:pointer, :string], :pointer | |
czmq_function :insert_zcert, :insert, [:pointer, :pointer], :void | |
end | |
end | |
module CZMQ | |
class Zctx | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zctx.txt | |
extend ::LibCZMQ | |
czmq_constructor | |
czmq_destructor | |
czmq_function :shadow, :shadow, [:pointer], :pointer | |
czmq_function :set_iothreads, :set_iothreads, [:pointer, :int], :void | |
czmq_function :set_linger, :set_linger, [:pointer, :int], :void | |
czmq_function :set_pipehwm, :set_pipehwm, [:pointer, :int], :void | |
czmq_function :set_sndhwm, :set_sndhwm, [:pointer, :int], :void | |
czmq_function :set_rcvhwm, :set_rcvhwm, [:pointer, :int], :void | |
czmq_function :underlying, :underlying, [:pointer], :pointer | |
attach_variable :interrupted, :zctx_interrupted, :int | |
end | |
end | |
module CZMQ | |
class Zframe | |
include Comparable | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zframe.txt | |
MORE = 1 | |
REUSE = 2 | |
DONTWAIT = 4 | |
extend ::LibCZMQ | |
czmq_constructor [:pointer, :size_t] | |
czmq_destructor | |
czmq_function :new_empty, :new_empty, [], :pointer | |
czmq_function :recv_zframe_nowait, :recv_nowait, [:pointer], :pointer | |
czmq_function :recv_zframe, :recv, [:pointer], :pointer | |
czmq_function :send_zframe, :send, [:pointer, :pointer, :int], :int | |
czmq_function :size, :size, [:pointer], :size_t | |
czmq_function :data, :data, [:pointer], :pointer | |
czmq_function :dup_zframe, :dup, [:pointer], :pointer | |
czmq_function :str_hex, :strhex, [:pointer], :string | |
czmq_function :str_eq, :streq, [:pointer, :string], :bool | |
czmq_function :str_dup, :strdup, [:pointer], :string | |
czmq_function :more, :more, [:pointer], :int | |
czmq_function :set_more, :set_more, [:pointer, :int], :void | |
def self.convert(frame) | |
if Utils.check_for_pointer(frame) | |
return frame | |
elsif frame.respond_to?(:to_zframe) && | |
Utils.check_for_pointer(frame.to_zframe) | |
return frame.to_zframe | |
else | |
fail ArgumentError, "#{frame.class} is not a CZMQ::Zframe" | |
end | |
end | |
def dup | |
self.class.new_from_czmq_obj(dup_zframe) | |
end | |
def <=>(other) | |
size <=> other.size | |
end | |
def ==(other) | |
to_str == other.to_str | |
end | |
def to_str | |
[str_hex].pack('H*') | |
end | |
def self.recv(socket) | |
zsocket = Zsocket.convert(socket) | |
unless (zframe = recv_zframe(zsocket)).null? | |
new_from_czmq_obj(zframe) | |
else | |
fail IOError, CZMQ::Utils.error | |
end | |
end | |
def self.recv_nowait(socket) | |
zsocket = Zsocket.convert(socket) | |
unless (zframe = recv_zframe_nowait(zsocket)).null? | |
new_from_czmq_obj(zframe) | |
else | |
fail IOError, CZMQ::Utils.error | |
end | |
end | |
end | |
end | |
module CZMQ | |
class Zloop | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zloop.txt | |
extend ::LibCZMQ | |
czmq_constructor | |
czmq_destructor | |
czmq_function :poller, :poller, [:pointer, :pointer, :pointer, :pointer], :int | |
czmq_function :poller_end, :poller_end, [:pointer, :pointer], :void | |
czmq_function :set_tolerant, :set_tolerant, [:pointer, :pointer], :void | |
czmq_function :timer, :timer, [:pointer, :size_t, :size_t, :pointer, :pointer], :int | |
czmq_function :timer_end, :timer_end, [:pointer, :int], :int | |
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void | |
czmq_function :start, :start, [:pointer], :int | |
def initialize | |
@poller_callbacks = [] | |
@timer_callbacks = [] | |
end | |
def add_poller(pollitem, *args, &block) | |
poller_callback = FFI::Function.new(:int, [:pointer, :pointer, :pointer], :blocking => true) do |zloopy, pollitem, *args| | |
zloop = CZMQ::Zloop.new_from_czmq_obj(zloopy, false) | |
zpollitem = Zpoller::PollItem.new(pollitem) | |
block.call(zloop, zpollitem, *args) | |
end | |
poller(pollitem, poller_callback, *args) | |
@poller_callbacks << {poller_callback: poller_callback, pollitem: pollitem} | |
end | |
def remove_poller(pollitem) | |
@poller_callbacks.delete_if {|poller| poller[:pollitem] = pollitem } | |
poller_end(pollitem) | |
end | |
def add_timer(delay, times, *args, &block) | |
timer_callback = FFI::Function.new(:int, [:pointer, :int, :pointer], :blocking => true) do |zloopy, timer_id, *args| | |
zloop = CZMQ::Zloop.new_from_czmq_obj(zloopy, false) | |
block.call(zloop, timer_id, *args) | |
end | |
timer_id = timer(delay, times, timer_callback, *args) | |
@timer_callbacks << {timer_callback: timer_callback, timer_id: timer_id} | |
timer_id | |
end | |
def remove_timer(timer_id) | |
@timer_callbacks.delete_if { |timer| timer[:timer_id] = timer_id } | |
timer_end(timer_id) | |
end | |
end | |
end | |
module CZMQ | |
class Zmsg | |
include Enumerable | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zmsg.txt | |
extend ::LibCZMQ | |
czmq_constructor | |
czmq_destructor | |
czmq_function :recv_zmsg, :recv, [:pointer], :pointer | |
czmq_function :recv_zmsg_nowait, :recv_nowait, [:pointer], :pointer | |
czmq_function :send_zmsg, :send, [:pointer, :pointer], :int | |
czmq_function :size, :size, [:pointer], :size_t | |
czmq_function :content_size, :content_size, [:pointer], :size_t | |
czmq_function :prepend_zframe, :prepend, [:pointer, :pointer], :int | |
czmq_function :append_zframe, :append, [:pointer, :pointer], :int | |
czmq_function :pop_zframe, :pop, [:pointer], :pointer | |
czmq_function :push_mem, :pushmem, [:pointer, :pointer, :size_t], :int | |
czmq_function :add_mem, :addmem, [:pointer, :pointer, :size_t], :int | |
czmq_function :push_zstr, :pushstr, [:pointer, :string], :int | |
czmq_function :add_zstr, :addstr, [:pointer, :string], :int | |
czmq_function :pop_zstr, :popstr, [:pointer], :string | |
czmq_function :unwrap_zframe, :unwrap, [:pointer], :pointer | |
czmq_function :remove_zframe, :remove, [:pointer, :pointer], :void | |
czmq_function :first_zframe, :first, [:pointer], :pointer | |
czmq_function :next_zframe, :next, [:pointer], :pointer | |
czmq_function :last_zframe, :last, [:pointer], :pointer | |
czmq_function :dup_zmsg, :dup, [:pointer], :pointer | |
def each | |
yield first | |
((size) -1).times do |i| | |
yield self.next | |
end | |
end | |
def to_a | |
ary = [] | |
each {|frame| ary << frame.to_str } | |
ary | |
end | |
def add(data) | |
case data | |
when String | |
add_string(data) | |
when Zframe | |
append_zframe(data) | |
else | |
if (data.respond_to?(:data) && data.respond_to?(:size)) | |
add_mem(data.data, data.size) | |
elsif data.respond_to?(:to_str) | |
add_string(data.to_str) | |
else | |
fail ArgumentError, 'Can only add Strings and CZMQ::Zframes' | |
end | |
end | |
end | |
alias_method :<<, :add | |
def push(data) | |
case data | |
when String | |
push_string(data) | |
when Zframe | |
prepend_zframe(data) | |
else | |
if (data.respond_to?(:data) && data.respond_to?(:size)) | |
push_mem(data.data, data.size) | |
elsif data.respond_to?(:to_str) | |
push_string(data.to_str) | |
else | |
fail ArgumentError, 'Can only push Strings and CZMQ::Zframes' | |
end | |
end | |
end | |
[:first, :next, :last].each do |meth| | |
self.class_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
def #{meth.to_s} | |
CZMQ::Zframe.new_from_czmq_obj(#{meth.to_s}_zframe, false) | |
end | |
RUBY | |
end | |
[:pop, :unrwap].each do |meth| | |
self.class_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
def #{meth.to_s} | |
CZMQ::Zframe.new_from_czmq_obj(#{meth.to_s}_zframe) | |
end | |
RUBY | |
end | |
def dup | |
self.class.new_from_czmq_obj(dup_zmsg) | |
end | |
def remove(zframe) | |
remove_zframe(Zframe.convert(zframe)) | |
end | |
def self.recv(socket) | |
zsocket = Zsocket.convert(socket) | |
unless (zmsg = recv_zmsg(zsocket)).null? | |
new_from_czmq_obj(zmsg) | |
else | |
fail IOError, CZMQ::Utils.error | |
end | |
end | |
def self.recv_nowait(socket) | |
zsocket = Zsocket.convert(socket) | |
unless (zmsg = recv_zmsg_nowait(zsocket)).null? | |
new_from_czmq_obj(zmsg) | |
else | |
fail IOError, CZMQ::Utils.error | |
end | |
end | |
private | |
def add_string(string) | |
if string.encoding == Encoding::ASCII_8BIT | |
add_mem(string, string.size) | |
else | |
add_zstr(string) | |
end | |
end | |
def push_string(string) | |
if string.encoding == Encoding::ASCII_8BIT | |
push_mem(string, string.size) | |
else | |
push_zstr(string) | |
end | |
end | |
end | |
end | |
module CZMQ | |
class Zmonitor | |
CONNECTED = 0x0001 | |
CONNECT_DELAYED = 0x0002 | |
CONNECT_RETRIED = 0x0004 | |
LISTENING = 0x0008 | |
BIND_FAILED = 0x0010 | |
ACCEPTED = 0x0020 | |
ACCEPT_FAILED = 0x0040 | |
CLOSED = 0x0080 | |
CLOSE_FAILED = 0x0100 | |
DISCONNECTED = 0x0200 | |
MONITOR_STOPPED = 0x0400 | |
ALL = 0xFFFF | |
extend ::LibCZMQ | |
czmq_constructor [:pointer, :pointer, :int], :zctx | |
czmq_destructor | |
czmq_function :recv, :recv, [:pointer], :pointer | |
czmq_function :socket, :socket, [:pointer], :pointer | |
czmq_function :set_verbose, :set_verbose, [:pointer, :bool], :void | |
def to_pollitem(polling_type = Zpoller::POLLIN) | |
Zpoller.create_pollitem(socket: socket, events: polling_type) | |
end | |
end | |
end | |
module CZMQ | |
class Zpoller | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zpoller.txt | |
extend ::LibCZMQ | |
POLL = 1 | |
POLLIN = 1 | |
POLLOUT = 2 | |
POLLERR = 4 | |
class PollItem < FFI::Struct | |
FD_TYPE = if FFI::Platform::IS_WINDOWS && FFI::Platform::ADDRESS_SIZE == 64 | |
# On Windows, zmq.h defines fd as a SOCKET, which is 64 bits on x64. | |
:uint64 | |
else | |
:int | |
end | |
layout :socket, :pointer, | |
:fd, FD_TYPE, | |
:events, :short, | |
:revents, :short | |
def readable? | |
(self[:revents] & POLLIN) > 0 | |
end | |
def writable? | |
(self[:revents] & POLLOUT) > 0 | |
end | |
def error? | |
(self[:revents] & POLLERR) > 0 | |
end | |
def inspect | |
"socket [#{self[:socket]}], fd [#{self[:fd]}], events [#{self[:events]}], revents [#{self[:revents]}]" | |
end | |
end | |
def self.create_pollitem(args={}) | |
pi = PollItem.new | |
pi[:socket] = args[:socket] | |
pi[:fd] = args[:fd] || 0 | |
pi[:events] = args[:events] || 0 | |
pi[:revents] = args[:revents] || 0 | |
pi | |
end | |
czmq_constructor [:varargs] | |
czmq_destructor | |
czmq_function :add, :add, [:pointer, :pointer], :int | |
czmq_function :wait, :wait, [:pointer, :int], :pointer | |
czmq_function :expired, :expired, [:pointer], :bool | |
czmq_function :terminated, :terminated, [:pointer], :bool | |
end | |
end | |
module CZMQ | |
class Zproxy | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zproxy.txt | |
extend ::LibCZMQ | |
czmq_constructor [:pointer, :pointer, :pointer], :zctx | |
czmq_destructor | |
end | |
end | |
module CZMQ | |
SET_SOCKOPT = /^set_(.+)$/.freeze | |
class Zsocket | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsocket.txt | |
PAIR = 0 | |
PUB = 1 | |
SUB = 2 | |
REQ = 3 | |
REP = 4 | |
XREQ = 5 | |
XREP = 6 | |
PULL = 7 | |
PUSH = 8 | |
XPUB = 9 | |
XSUB = 10 | |
DEALER = XREQ | |
ROUTER = XREP | |
STREAM = 11 | |
extend ::LibCZMQ | |
czmq_constructor [:pointer, :int], :zctx | |
czmq_destructor :zctx | |
czmq_function :bind, :bind, [:pointer, :string], :int | |
czmq_function :unbind, :unbind, [:pointer, :string], :int | |
czmq_function :connect, :connect, [:pointer, :string], :int | |
czmq_function :disconnect, :disconnect, [:pointer, :string], :int | |
czmq_function :poll, :poll, [:pointer, :int], :bool | |
czmq_function :type_str, :type_str, [:pointer], :string | |
czmq_function :send_mem, :sendmem, [:pointer, :pointer, :size_t, :int], :int | |
czmq_function :signal, :signal, [:pointer], :int | |
czmq_function :wait, :wait, [:pointer], :int | |
def to_pollitem(polling_type = Zpoller::POLLIN) | |
Zpoller.create_pollitem(socket: @czmq_obj, events: polling_type) | |
end | |
def self.convert(socket) | |
if Utils.check_for_pointer(socket) | |
return socket | |
elsif socket.respond_to?(:to_zsocket) && | |
Utils.check_for_pointer(socket.to_zsocket) | |
return socket.to_zsocket | |
else | |
fail ArgumentError, "#{socket.class} is not a CZMQ::Zsocket" | |
end | |
end | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsockopt.txt | |
def method_missing(meth, *args, &blk) | |
if args.length == 1 && | |
meth.to_s =~ SET_SOCKOPT && | |
(args.first.is_a?(Integer) || args.first.is_a?(String)) | |
begin | |
self.class.instance_eval <<-RUBY, __FILE__, __LINE__ +1 | |
attach_function #{meth.inspect}, "zsocket_#{meth.to_s}", [:pointer, :varargs], :void | |
RUBY | |
rescue FFI::NotFoundError | |
super | |
else | |
if args.first.is_a?(Integer) | |
self.class.class_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
def #{meth.to_s}(arg) | |
self.class.#{meth.to_s}(@czmq_obj, :int, arg) | |
end | |
RUBY | |
elsif args.first.is_a?(String) | |
self.class.class_eval <<-RUBY, __FILE__, __LINE__ + 1 | |
def #{meth.to_s}(arg) | |
self.class.#{meth.to_s}(@czmq_obj, :string, arg) | |
end | |
RUBY | |
end | |
send meth, args.first | |
end | |
else | |
super | |
end | |
end | |
end | |
end | |
module CZMQ | |
class Zstr | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zstr.txt | |
extend ::LibCZMQ | |
czmq_destructor nil, false | |
czmq_function :recv_zstr_nowait, :recv_nowait, [:pointer], :pointer | |
czmq_function :recv_zstr, :recv, [:pointer], :pointer | |
czmq_function :send_zstr, :send, [:pointer, :string], :int | |
czmq_function :sendm, :sendm, [:pointer, :string], :int | |
def initialize(socket) | |
@zsocket = Zsocket.convert(socket) | |
end | |
def recv | |
Utils.read_string(self.class.recv_zstr(@zsocket)) | |
end | |
def recv_nowait | |
Utils.read_string(self.class.recv_zstr_nowait(@zsocket)) | |
end | |
def send_zstr(str) | |
result = self.class.send_zstr(@zsocket, str) | |
result != -1 ||fail(IOError, CZMQ::Utils.error) | |
result | |
end | |
def sendm(str) | |
result = self.class.sendm(@zsocket, str) | |
result != -1 ||fail(IOError, CZMQ::Utils.error) | |
result | |
end | |
end | |
end | |
module CZMQ | |
class Zsys | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zsys.txt | |
extend ::LibCZMQ | |
czmq_function :handler_set, :handler_set, [:pointer], :void | |
czmq_function :handler_reset, :handler_reset, [], :void | |
czmq_function :set_interface, :set_interface, [:string], :void | |
czmq_function :interface, :interface, [], :string | |
czmq_function :socket_error, :socket_error, [:string], :void | |
end | |
end | |
module CZMQ | |
class Zthread | |
# See https://github.com/zeromq/czmq/blob/v2.2.0/doc/zthread.txt | |
extend ::LibCZMQ | |
czmq_destructor nil, false | |
czmq_function :fork_zthread, :fork, [:pointer, :pointer, :pointer], :pointer | |
czmq_function :new_zthread, :new, [:pointer, :pointer], :int | |
def fork_zthread(zthread_attached_fn, *args) | |
self.class.fork_zthread(CZMQ.context.to_zctx, zthread_attached_fn, *args) | |
end | |
def fork_callback(*args, &block) | |
@fork_callback = FFI::Function.new(:void, [:pointer, :pointer, :pointer], :blocking => true) do |*args, zctxy, pipey| | |
zctx = CZMQ::Zctx.new_from_czmq_obj(zctxy) | |
pipe = CZMQ::Zsocket.new_from_czmq_obj(pipey) | |
block.call(*args, zctx, pipe) | |
end | |
CZMQ::Zsocket.new_from_czmq_obj(fork_zthread(@fork_callback, *args)) | |
end | |
def new_callback(*args, &block) | |
@new_callback = FFI::Function.new(:void, [:pointer], :blocking => true) do |*args| | |
block.call(*args) | |
end | |
new_zthread(@new_callback, *args) | |
end | |
end | |
end | |
module CZMQ | |
class Zuuid | |
extend ::LibCZMQ | |
czmq_constructor | |
czmq_destructor | |
czmq_function :data, :data, [:pointer], :pointer | |
czmq_function :size, :size, [:pointer], :size_t | |
czmq_function :to_str, :str, [:pointer], :string | |
#czmq_function :set, :set, [:pointer, :pointer], :void | |
#czmq_function :export, :export, [:pointer, :pointer], :void | |
czmq_function :eq, :eq, [:pointer, :pointer], :bool | |
czmq_function :neq, :neq, [:pointer, :pointer], :bool | |
end | |
end | |
module CZMQ | |
class << self | |
def init | |
::Thread.exclusive do | |
return @context if @context | |
CZMQ::Zsys.handler_set(nil) | |
@context = CZMQ::Zctx.new | |
# this should be miliseconds | |
@context.set_linger(64) | |
@context | |
end | |
end | |
def context | |
fail 'you must initialize CZMQ by calling CZMQ.init' unless @context | |
@context | |
end | |
end | |
at_exit do | |
if @context | |
@context.destructor | |
@context = nil | |
end | |
::Thread.exclusive do | |
CZMQ::Zctx.interrupted = 1 | |
CZMQ::Zsys.handler_reset | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source 'https://rubygems.org' | |
gem 'pry' | |
platforms :ruby do | |
gem 'msgpack' | |
end | |
platforms :jruby do | |
gem 'msgpack-jruby', :require => 'msgpack' | |
end | |
gem 'ffi' | |
gem 'concurrent-ruby' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment