Skip to content

Instantly share code, notes, and snippets.

@zoomix
Created October 26, 2012 13:45
Show Gist options
  • Save zoomix/3958901 to your computer and use it in GitHub Desktop.
Save zoomix/3958901 to your computer and use it in GitHub Desktop.
Batched publisher to rabbitmq
# encoding: utf-8
require 'msgpack'
require 'amqp'
require 'burt/logging'
require 'thread'
module RichParser
module BatchPublisher
class BatchPublisher
include Burt::Logging
MAX_MESSAGE_COUNT= 10
def initialize(amqp_channel, exchange_name, probe, options={})
@amqp_channel, @exchange_name, @probe = amqp_channel, exchange_name, probe
@batches = Hash.new do |hash, key|
batch = Batch.new(self, key, {:max_message_count => options[:max_message_count] || MAX_MESSAGE_COUNT})
batch.start
hash[key] = batch
end
@shutting_down = false
end
def reset_counters!
end
def start
@exchange = @amqp_channel.direct(@exchange_name, :passive => true)
@app_id = "parser.#{Process.ppid}"
end
# Delayed publishing
def publish(item)
return unless item
return false if AMQP.closing? || @shutting_down
raise 'No routing key' unless item[:routing_key]
@batches[item[:routing_key]] << item
true
end
def publish_batch(items, routing_key)
return false if AMQP.closing?
return false unless @probe.key_ok?(routing_key)
options = {:routing_key => routing_key, :app_id => @app_id, :persistent => true, :content_type => MSG_PACK_CONTENT_TYPE}
@exchange.publish(encode_message(items), options)
true
end
def flush!
log.warn "Flushing #{@batches.size} batches."
@shutting_down = true
@batches.values.each { |batch| batch.flush! }
end
private
def encode_message(fragment)
MessagePack.pack(fragment).to_s
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment