Skip to content

Instantly share code, notes, and snippets.

@Asmod4n
Last active August 29, 2015 13:57
Show Gist options
  • Save Asmod4n/9372750 to your computer and use it in GitHub Desktop.
Save Asmod4n/9372750 to your computer and use it in GitHub Desktop.
http://rfc.zeromq.org/spec:27 implementation in ruby Requirements: zeromq build with libsodium, Install: gem install rbnacl celluloid-zmq, Usage: ruby zmqcurvezap.rb
require 'rbnacl'
require 'celluloid/zmq'
require 'socket'
if Celluloid::ZMQ::VERSION == "0.15.0"
module Celluloid
module ZMQ
def self.evented?
actor = Thread.current[:celluloid_actor]
actor.mailbox.is_a?(Celluloid::ZMQ::Mailbox)
end
def wait_readable(socket)
if ZMQ.evented?
mailbox = Thread.current[:celluloid_mailbox]
mailbox.reactor.wait_readable(socket)
else
raise ArgumentError, "unable to wait for ZMQ sockets outside the event loop"
end
nil
end
def wait_writable(socket)
if ZMQ.evented?
mailbox = Thread.current[:celluloid_mailbox]
mailbox.reactor.wait_writable(socket)
else
raise ArgumentError, "unable to wait for ZMQ sockets outside the event loop"
end
nil
end
class Socket
def set(option, value, length = nil)
unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(option, value, length)
raise IOError, "couldn't set value for option #{option}: #{::ZMQ::Util.error_string}"
end
end
def get(option)
option_value = []
unless ::ZMQ::Util.resultcode_ok? @socket.getsockopt(option, option_value)
raise IOError, "couldn't get value for option #{option}: #{::ZMQ::Util.error_string}"
end
option_value[0]
end
end
module ReadableSocket
def read_multipart(buffer = [])
ZMQ.wait_readable(@socket) if ZMQ.evented?
unless ::ZMQ::Util.resultcode_ok? @socket.recv_strings buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end
end
module WriteableSocket
def write(*messages)
ZMQ.wait_writable(@socket) if ZMQ.evented?
unless ::ZMQ::Util.resultcode_ok? @socket.send_strings messages.flatten
raise IOError, "error sending 0MQ message: #{::ZMQ::Util.error_string}"
end
messages
end
end
end
end
end
$server_private_key = RbNaCl::PrivateKey.generate
$server_public_key = $server_private_key.public_key
$client_private_key = RbNaCl::PrivateKey.generate
$client_public_key = $client_private_key.public_key
server = TCPServer.new('127.0.0.1', 0)
$bind_point = "tcp://127.0.0.1:#{server.addr[1]}"
server.close
class Handler
include Celluloid::ZMQ
include Celluloid::Logger
finalizer :finalize
def initialize
@handler = RouterSocket.new
@handler.set(::ZMQ::ZAP_DOMAIN, 'test')
@handler.identity = 'zeromq.zap.01'
begin
@handler.bind('inproc://zeromq.zap.01')
rescue IOError
@handler.close
raise
end
async.run
end
def run
loop { async.handle_messages @handler.read_multipart }
end
def handle_messages(messages)
debug '<handler_read>'
debug messages
debug '</handler_read>'
delimiter = messages.index('')
servers, payload = messages[0, delimiter], messages[delimiter+1..-1]
debug '<handler_write>'
if payload.size == 7
version, request_id, domain, address, identidy, mechanism, credentials = payload
if version == '1.0'
if mechanism == 'CURVE'
if credentials == $client_public_key
debug @handler << servers.concat(['', '1.0', request_id, '200', 'OK', 'client', ''])
else
debug @handler << servers.concat(['', '1.0', request_id, '400', 'Identity is not known', '', ''])
end
else
debug @handler << servers.concat(['', '1.0', request_id, '400', 'Security mechanism not supported', '', ''])
end
else
debug @handler << servers.concat(['', '1.0', request_id, '500', 'Version number not valid', '', ''])
end
else
debug @handler << servers.concat(['', '1.0', '1', '500', 'Payload size not valid', '', ''])
end
debug '</handler_write>'
end
def finalizer
@handler.close if @handler
end
end
class Server
include Celluloid::ZMQ
include Celluloid::Logger
finalizer :finalize
def initialize
@socket = RouterSocket.new
@socket.set(::ZMQ::ZAP_DOMAIN, 'test')
@socket.identity = 'server'
@socket.set(::ZMQ::CURVE_SERVER, 1)
@socket.set(::ZMQ::CURVE_SECRETKEY, $server_private_key.to_s)
begin
@socket.bind($bind_point)
rescue IOError
@socket.close
raise
end
async.run
end
def run
loop { async.handle_messages @socket.read_multipart }
end
def handle_messages(messages)
delimiter = messages.index('')
servers, payload = messages[0, delimiter], messages[delimiter+1..-1]
debug '<server_read>'
debug payload
debug '</server_read>'
debug '<server_write>'
debug @socket << servers.concat(['', "Hello #{payload.flatten}"])
debug '</server_write>'
end
def finalizer
@socket.close if @socket
end
end
class Client
include Celluloid::ZMQ
include Celluloid::Logger
finalizer :finalize
def initialize
@socket = DealerSocket.new
@socket.set(::ZMQ::ZAP_DOMAIN, 'test')
@socket.identity = 'client'
@socket.set(::ZMQ::CURVE_SERVERKEY, $server_public_key.to_s)
@socket.set(::ZMQ::CURVE_PUBLICKEY, $client_public_key.to_s)
@socket.set(::ZMQ::CURVE_SECRETKEY, $client_private_key.to_s)
begin
@socket.connect($bind_point)
rescue IOError
@socket.close
raise
end
async.run
end
def run
loop { async.handle_messages @socket.read_multipart }
end
def handle_messages(messages)
debug '<client_read>'
debug messages
debug '</client_read>'
end
def write(*messages)
debug '<client_write>'
debug @socket << messages
debug '</client_write>'
true
end
def finalize
@socket.close if @socket
end
end
handler = Handler.new
server = Server.new
client = Client.new
client.async.write('', 'Hello Server')
client.async.write('', 'Hello Server 2')
client.async.write('', 'Hello Server 3')
client.async.write('', 'Hello Server 4')
client.async.write('', 'Hello Server 5')
sleep 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment