Created
January 23, 2012 03:34
-
-
Save rabbitt/1660366 to your computer and use it in GitHub Desktop.
Simulates a client drop and reconnect during REQ/REP send/receive
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'ffi-rzmq' | |
require 'timeout' | |
ctx = ZMQ::Context.new | |
MAX_RETRIES = 5 | |
Thread.abort_on_exception = true | |
class CommError < Exception; end | |
requestor = Thread.new do | |
socket = ctx.socket(ZMQ::REQ).tap { |s| s.bind('ipc:///tmp/test-zmq') } | |
puts "REQUESTOR: SENDING SYN" | |
socket.send_string 'SYN' | |
puts "REQUESTOR: WAITING FOR ACK" | |
sleep 0.1 until (rc = socket.recv_string(message = '')) == 0 | |
raise Exception, 'NO ACK RECEIVED' unless message == 'ACK' | |
puts "REQUESTOR: ACK RECEIVED" | |
retries = 0 | |
begin | |
puts "REQUESTOR: Sending FAIL message. This message won't be received by the RESPONDER..." | |
raise CommError, ZMQ::Util.error_string unless (rc = socket.send_string('FAIL')) == 0 | |
Timeout.timeout(1.0) do | |
raise CommError, ZMQ::Util.error_string unless (rc = socket.recv_string(message = '')) == 0 | |
end | |
rescue Timeout::Error | |
puts "REQUESTOR: Timed out waiting for a response from the RESPONDER" | |
if retries >= MAX_RETRIES | |
puts "REQUESTOR: Failed to get a response. Current ZMQ::Util.error_string value: #{ZMQ::Util.error_string}" | |
else | |
puts "REQUESTOR: attempting to resend message" | |
retries += 1 | |
retry | |
end | |
rescue CommError => e | |
puts "REQUESTOR: expected ACK - got communication error instead: #{e.message}" | |
end | |
end | |
responder = Thread.new do | |
socket = ctx.socket(ZMQ::REP).tap { |s| s.connect('ipc:///tmp/test-zmq') } | |
puts "responder: WAITING FOR SYN" | |
socket.recv_string(message = '') | |
if message == 'SYN' | |
puts "responder: received SYN - sending ACK" | |
socket.send_string 'ACK' | |
end | |
puts "responder: SIMULATING CLIENT DROP AND RECONNECT" | |
# simulate a client going away and coming back | |
socket.close | |
socket = nil | |
puts "responder: Reconnecting..." | |
socket = ctx.socket(ZMQ::REP).tap { |s| s.connect('ipc:///tmp/test-zmq') } | |
puts "responder: WAITING FOR SYN" | |
socket.recv_string(message = '') | |
if message == 'FAIL' | |
puts "responder: received SYN - sending ACK" | |
socket.send_string 'ACK' | |
end | |
end | |
requestor.join |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment