Created
August 20, 2013 20:17
-
-
Save maxdemarzi/6286700 to your computer and use it in GitHub Desktop.
Size + Time Rabbit MQ Accumulator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
require 'timers' | |
TIME_TO_WAIT=3 | |
MAX_BUFFER=3 | |
# Setup Timers | |
timers = Timers.new | |
@three_second_timer = timers.every(3) { puts "***********triggered by timer*********" ; process_messages } | |
def process_messages | |
if [email protected]? || times_up? | |
@messages.each do |message| | |
puts "Added #{message} to transaction" | |
end | |
puts "Batched #{@messages.size}:" | |
@channel.acknowledge(@last, true) | |
reset_variables | |
end | |
end | |
def reset_variables | |
@cnt = 0 | |
@messages = [] | |
@last = nil | |
@last_time = Time.now | |
#Timers.reset | |
@three_second_timer.reset | |
#@three_second_timer.delay(TIME_TO_WAIT) | |
end | |
def times_up? | |
(@last_time + TIME_TO_WAIT) > Time.now | |
end | |
# Initialize | |
reset_variables | |
t1 = Thread.new do | |
loop do | |
loop { timers.wait } | |
end | |
end | |
t1.abort_on_exception = true | |
# Start and Configure RabbitMQ | |
connection = Bunny.new | |
connection.start | |
@channel = connection.create_channel | |
@channel.prefetch(MAX_BUFFER) | |
exchange = @channel.direct("moar") | |
queue = @channel.queue("", :durable => true) | |
queue.purge #just for this test | |
queue.bind(exchange).subscribe(:ack => true, :block => false) do |delivery_info, metadata, payload| | |
@last = delivery_info.delivery_tag | |
puts "Got the #{payload}" | |
@messages << payload | |
@cnt +=1 | |
if @cnt >= MAX_BUFFER | |
puts "***********triggered by buffer*********" | |
process_messages | |
end | |
end | |
20.times do |t| | |
sleep 0.5 + rand | |
exchange.publish("Hello #{t}!") | |
end | |
sleep 5 | |
connection.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment