Created
July 8, 2011 17:11
-
-
Save pdlug/1072288 to your computer and use it in GitHub Desktop.
Test AMQP consumer/producer with auto recovery
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CONSUMER: | |
require 'amqp' | |
def setup_connection(connection) | |
puts "Setting up connection" | |
channel = AMQP::Channel.new(connection, :auto_recovery => true) | |
channel.on_error do |ch, channel_close| | |
raise channel_close.reply_text | |
end | |
exchange = channel.direct('consumers', :durable => true) | |
queue = channel.queue('consumer', :durable => true) | |
queue.bind(exchange, :key => 'consumer').subscribe do |headers, payload| | |
puts "Got '#{payload}' for '#{queue.name}'. Headers are #{headers.to_hash.inspect}" | |
if headers.reply_to | |
AMQP::Exchange.default.publish("Processed #{payload}", :key => headers.reply_to) | |
end | |
end | |
end | |
AMQP.start( | |
:host => '127.0.0.1', | |
:vhost => '/', | |
:timeout => 0.3, | |
:on_tcp_connection_failure => proc { |settings| puts "Failed to connect"; EM.stop }, | |
:on_possible_authentication_failure => proc { |settings| puts "Failed to authenticate"; EM.stop } | |
) do |connection, open_ok| | |
connection.on_error do |ch, connection_close| | |
#raise connection_close.reply_text | |
end | |
connection.on_tcp_connection_loss do |conn, settings| | |
puts "Lost connection to broker, reconnecting..." | |
conn.reconnect(false, 1) | |
end | |
setup_connection(connection) | |
trap(:INT) do | |
unless connection.closing? | |
connection.close { EM.stop { exit } } | |
end | |
end | |
end | |
# PRODUCER: | |
require 'amqp' | |
require 'simple_uuid' | |
def setup_connection(connection) | |
puts "Setting up connection" | |
channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :auto_recovery => true) | |
exchange = channel.direct('consumers', :passive => true) | |
exchange.on_return do |basic_return, metadata, payload| | |
puts "Message returned: #{payload}, reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}" | |
end | |
queue_name = SimpleUUID::UUID.new.to_guid | |
channel.queue(queue_name, :auto_delete => true).subscribe { |msg| | |
puts "got a reply: #{msg}" | |
} | |
n = 0 | |
EM.add_periodic_timer(1) { | |
unless connection.connected? | |
puts "No connection to broker!" | |
else | |
puts "publishing #{n += 1}" | |
exchange.publish("Message #{n}", :mandatory => true, :immediate => true, :key => 'consumer', :reply_to => queue_name) | |
end | |
} | |
end | |
AMQP.start( | |
:host => '127.0.0.1', | |
:vhost => '/', | |
:timeout => 0.3, | |
:on_tcp_connection_failure => proc { |settings| puts "Failed to connect"; EM.stop }, | |
:on_possible_authentication_failure => proc { |settings| puts "Failed to authenticate"; EM.stop } | |
) do |connection, open_ok| | |
connection.on_error do |ch, connection_close| | |
raise connection_close.reply_text | |
end | |
connection.on_tcp_connection_loss do |cl, settings| | |
puts "Lost connection to broker, reconnecting..." | |
cl.reconnect(false, 1) | |
end | |
setup_connection(connection) | |
trap(:INT) do | |
unless connection.closing? | |
connection.close { EM.stop { exit } } | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment