-
-
Save danielsdeleo/255282 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is how Queue#subscribe will work in the next version of Bunny (v0.5.4). | |
# It's in the 'experimental' branch now. | |
# | |
# In order for subscription to work smoothly you need to use a combination of | |
# Queue#subscribe, Queue#unsubscribe, Queue#ack and Client#qos prefetch. The | |
# prefetch will allow a controlled number of messages to be released to a | |
# consumer, which means that Queue#unsubscribe and other methods can be run | |
# without errors due to out of sequence messages. | |
# | |
# Example 1 - Using :message_max where no. of messages > message_max | |
# | |
# Start Bunny instance | |
b = Bunny.new(:logging => true) | |
b.start | |
# Create queue | |
q = b.queue('subtest1') | |
# Make sure queue is empty | |
q.purge | |
# Set prefetch window (no. of messages that server will send at a time). | |
# Default is 1. RabbitMQ v1.6.0 only accepts :prefetch_count option. The | |
# other options have not been implemented yet. | |
b.qos() | |
# Publish some messages | |
10.times {q.publish('Here be another test message')} | |
# Subscribe to the queue. Must specify :ack => true otherwise | |
# prefetch will not work. | |
q.subscribe(:message_max => 5, :ack => true) {|msg| puts msg[:payload]} | |
# Check the queue status | |
s = q.status | |
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}" | |
# | |
# Example 2 - Using :timeout | |
# | |
# Copy set up stuff from Example 1. Prefetch is not needed. | |
# Subscribe to the queue | |
q.subscribe(:timeout => 10) {|msg| puts msg[:payload]} | |
# Check the queue status | |
s = q.status | |
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}" | |
# | |
# Example 3 - Only process the first message from the queue. N.B. More easily | |
# done with :message_max option | |
# | |
# Copy set up stuff from Example 1. Prefetch is needed. | |
# Subscribe to the queue. Prefetch is needed | |
q.subscribe(:consumer_tag => 'my_consumer', :ack => true) do |msg| | |
# Get first message | |
puts msg[:payload] | |
# Unsubscribe from queue | |
q.unsubscribe() | |
# Acknowledge the message | |
q.ack() | |
# Break out of the loop | |
break | |
end | |
# Check the queue status | |
s = q.status | |
puts "Queue: #{q.name}\nMessages: #{s[:message_count]}\nConsumers: #{s[:consumer_count]}" | |
# | |
# Example 4 - Process all messages from queue then stop when queue empty | |
# | |
b = Bunny.new | |
b.start() | |
b.qos() | |
q = b.queue() | |
10.times { q.publish('Hey Ho') } | |
cnt = 0 | |
q.subscribe(:ack => true) do |msg| | |
cnt += 1 | |
msg_cnt = q.message_count() | |
puts '************************' | |
puts "Message: #{cnt} - #{msg[:payload]}" | |
puts "#{msg_cnt} message(s) left in the queue" | |
puts '************************' | |
if msg_cnt == 0 | |
q.unsubscribe() | |
q.ack() | |
break | |
end | |
end | |
# | |
# Example 5 - Display all of the information about a consumed message | |
# | |
b = Bunny.new() | |
b.start() | |
b.qos() | |
q = b.queue() | |
q.publish('Another message served up by RabbitMQ') | |
q.subscribe(:message_max => 1, :ack => true) do |msg| | |
puts | |
puts "Message Information" | |
puts '-------------------' | |
puts "Payload - #{msg[:payload]}" | |
puts | |
puts "Header - " | |
msg[:header].instance_variables.each do |var| | |
vname = var.to_s.sub(/@/, '') | |
val = eval("msg[:header]." + vname) | |
if val.is_a?(Hash) | |
puts " #{vname.capitalize} -" | |
val.each_pair do |k,v| | |
puts " #{k.capitalize}: #{v}" | |
end | |
else | |
puts " #{vname.capitalize}: #{val.to_s}" | |
end | |
end | |
puts | |
puts "Delivery details -" | |
msg[:delivery_details].each_pair do |k,v| | |
puts " #{k.capitalize}: #{v}" | |
end | |
puts | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment