Last active
August 29, 2015 13:57
-
-
Save Asmod4n/9372750 to your computer and use it in GitHub Desktop.
http://rfc.zeromq.org/spec:27 implementation in ruby Requirements: zeromq build with libsodium, Install: gem install rbnacl celluloid-zmq, Usage: ruby zmqcurvezap.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rbnacl' | |
require 'celluloid/zmq' | |
require 'socket' | |
if Celluloid::ZMQ::VERSION == "0.15.0" | |
module Celluloid | |
module ZMQ | |
def self.evented? | |
actor = Thread.current[:celluloid_actor] | |
actor.mailbox.is_a?(Celluloid::ZMQ::Mailbox) | |
end | |
def wait_readable(socket) | |
if ZMQ.evented? | |
mailbox = Thread.current[:celluloid_mailbox] | |
mailbox.reactor.wait_readable(socket) | |
else | |
raise ArgumentError, "unable to wait for ZMQ sockets outside the event loop" | |
end | |
nil | |
end | |
def wait_writable(socket) | |
if ZMQ.evented? | |
mailbox = Thread.current[:celluloid_mailbox] | |
mailbox.reactor.wait_writable(socket) | |
else | |
raise ArgumentError, "unable to wait for ZMQ sockets outside the event loop" | |
end | |
nil | |
end | |
class Socket | |
def set(option, value, length = nil) | |
unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(option, value, length) | |
raise IOError, "couldn't set value for option #{option}: #{::ZMQ::Util.error_string}" | |
end | |
end | |
def get(option) | |
option_value = [] | |
unless ::ZMQ::Util.resultcode_ok? @socket.getsockopt(option, option_value) | |
raise IOError, "couldn't get value for option #{option}: #{::ZMQ::Util.error_string}" | |
end | |
option_value[0] | |
end | |
end | |
module ReadableSocket | |
def read_multipart(buffer = []) | |
ZMQ.wait_readable(@socket) if ZMQ.evented? | |
unless ::ZMQ::Util.resultcode_ok? @socket.recv_strings buffer | |
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}" | |
end | |
buffer | |
end | |
end | |
module WriteableSocket | |
def write(*messages) | |
ZMQ.wait_writable(@socket) if ZMQ.evented? | |
unless ::ZMQ::Util.resultcode_ok? @socket.send_strings messages.flatten | |
raise IOError, "error sending 0MQ message: #{::ZMQ::Util.error_string}" | |
end | |
messages | |
end | |
end | |
end | |
end | |
end | |
$server_private_key = RbNaCl::PrivateKey.generate | |
$server_public_key = $server_private_key.public_key | |
$client_private_key = RbNaCl::PrivateKey.generate | |
$client_public_key = $client_private_key.public_key | |
server = TCPServer.new('127.0.0.1', 0) | |
$bind_point = "tcp://127.0.0.1:#{server.addr[1]}" | |
server.close | |
class Handler | |
include Celluloid::ZMQ | |
include Celluloid::Logger | |
finalizer :finalize | |
def initialize | |
@handler = RouterSocket.new | |
@handler.set(::ZMQ::ZAP_DOMAIN, 'test') | |
@handler.identity = 'zeromq.zap.01' | |
begin | |
@handler.bind('inproc://zeromq.zap.01') | |
rescue IOError | |
@handler.close | |
raise | |
end | |
async.run | |
end | |
def run | |
loop { async.handle_messages @handler.read_multipart } | |
end | |
def handle_messages(messages) | |
debug '<handler_read>' | |
debug messages | |
debug '</handler_read>' | |
delimiter = messages.index('') | |
servers, payload = messages[0, delimiter], messages[delimiter+1..-1] | |
debug '<handler_write>' | |
if payload.size == 7 | |
version, request_id, domain, address, identidy, mechanism, credentials = payload | |
if version == '1.0' | |
if mechanism == 'CURVE' | |
if credentials == $client_public_key | |
debug @handler << servers.concat(['', '1.0', request_id, '200', 'OK', 'client', '']) | |
else | |
debug @handler << servers.concat(['', '1.0', request_id, '400', 'Identity is not known', '', '']) | |
end | |
else | |
debug @handler << servers.concat(['', '1.0', request_id, '400', 'Security mechanism not supported', '', '']) | |
end | |
else | |
debug @handler << servers.concat(['', '1.0', request_id, '500', 'Version number not valid', '', '']) | |
end | |
else | |
debug @handler << servers.concat(['', '1.0', '1', '500', 'Payload size not valid', '', '']) | |
end | |
debug '</handler_write>' | |
end | |
def finalizer | |
@handler.close if @handler | |
end | |
end | |
class Server | |
include Celluloid::ZMQ | |
include Celluloid::Logger | |
finalizer :finalize | |
def initialize | |
@socket = RouterSocket.new | |
@socket.set(::ZMQ::ZAP_DOMAIN, 'test') | |
@socket.identity = 'server' | |
@socket.set(::ZMQ::CURVE_SERVER, 1) | |
@socket.set(::ZMQ::CURVE_SECRETKEY, $server_private_key.to_s) | |
begin | |
@socket.bind($bind_point) | |
rescue IOError | |
@socket.close | |
raise | |
end | |
async.run | |
end | |
def run | |
loop { async.handle_messages @socket.read_multipart } | |
end | |
def handle_messages(messages) | |
delimiter = messages.index('') | |
servers, payload = messages[0, delimiter], messages[delimiter+1..-1] | |
debug '<server_read>' | |
debug payload | |
debug '</server_read>' | |
debug '<server_write>' | |
debug @socket << servers.concat(['', "Hello #{payload.flatten}"]) | |
debug '</server_write>' | |
end | |
def finalizer | |
@socket.close if @socket | |
end | |
end | |
class Client | |
include Celluloid::ZMQ | |
include Celluloid::Logger | |
finalizer :finalize | |
def initialize | |
@socket = DealerSocket.new | |
@socket.set(::ZMQ::ZAP_DOMAIN, 'test') | |
@socket.identity = 'client' | |
@socket.set(::ZMQ::CURVE_SERVERKEY, $server_public_key.to_s) | |
@socket.set(::ZMQ::CURVE_PUBLICKEY, $client_public_key.to_s) | |
@socket.set(::ZMQ::CURVE_SECRETKEY, $client_private_key.to_s) | |
begin | |
@socket.connect($bind_point) | |
rescue IOError | |
@socket.close | |
raise | |
end | |
async.run | |
end | |
def run | |
loop { async.handle_messages @socket.read_multipart } | |
end | |
def handle_messages(messages) | |
debug '<client_read>' | |
debug messages | |
debug '</client_read>' | |
end | |
def write(*messages) | |
debug '<client_write>' | |
debug @socket << messages | |
debug '</client_write>' | |
true | |
end | |
def finalize | |
@socket.close if @socket | |
end | |
end | |
handler = Handler.new | |
server = Server.new | |
client = Client.new | |
client.async.write('', 'Hello Server') | |
client.async.write('', 'Hello Server 2') | |
client.async.write('', 'Hello Server 3') | |
client.async.write('', 'Hello Server 4') | |
client.async.write('', 'Hello Server 5') | |
sleep 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment