Skip to content

Instantly share code, notes, and snippets.

@paukul
Created July 18, 2010 18:05
Show Gist options
  • Save paukul/480580 to your computer and use it in GitHub Desktop.
Save paukul/480580 to your computer and use it in GitHub Desktop.
require 'bunny'
#
# By default the logs are routed to the host, exchange and key specified in DEFAULT_OPTIONS
# Change the configuration when creating logger with an argument hash:
#
# logging_config = { :routing_key => "applogging",
# :host => AppConfig.amqp_logging.host,
# :exchange => AppConfig.amqp_logging.exchange }
#
# logger = AMQPLogging::Logger.new(config.log_path, logging_config)
# config.logger = logger
#
# You can set the routing key with a string or use a generator that respondes to call,
# receives the logline as the first argument and returns the routing key.
#
# Examples:
# AMQPRoutingKeyGenerator = lambda do |logline|
# if logline =~ /(?:engine\[([^\]]*)\])\: (Completed in|Processing|Session ID)?/
# key = "logs.app.#{$1}"
# key << ".statistics" unless $2.nil?
# else
# key = "logs.app.system"
# end
# key
# end
#
# AMQPLogging::Logger.new(config.log_path, :routing_key => AMQPRoutingKeyGenerator)
#
module AMQPLogging
DEFAULT_OPTIONS = {
:shift_age => 0,
:shift_size => 1048576,
:host => "localhost",
:exchange => "logging_exchange",
:routing_key => "logs"
}
RETRY_AFTER = 10.seconds
class Logger < ::Logger
attr_accessor :extra_attributes
def initialize(logdev, *args)
options = args.first.is_a?(Hash) ? DEFAULT_OPTIONS.merge(args.first) : DEFAULT_OPTIONS
super(logdev, options[:shift_age], options[:shift_size])
@logdev = AMQPLogDevice.new(@logdev, options)
end
end
class AMQPLogDevice
attr_reader :exchange, :configuration
def initialize(dev, opts = {})
@configuration = opts
@fallback_logdev = dev
end
def write(msg)
begin
if @paused.nil? || @paused <= RETRY_AFTER.ago
routing_key = configuration[:routing_key].respond_to?(:call) ? configuration[:routing_key].call(msg).to_s : configuration[:routing_key]
exchange.publish(msg, :key => routing_key)
end
rescue Exception => exception
pause_amqp_logging(exception)
ensure
@fallback_logdev.write(msg)
end
end
def close
@fallback_logdev.close
end
private
def pause_amqp_logging(exception)
@paused = Time.now
reset_amqp
# do whatever you need to do, we in our case send a notification
end
def reset_amqp
begin
bunny.stop if bunny.connected?
rescue
# if bunny throws an exception here, its not usable anymore anyway
ensure
@exchange = @bunny = nil
end
end
def exchange
bunny.start unless bunny.connected?
@exchange ||= bunny.exchange(configuration[:exchange], :type => :topic)
end
def bunny
@bunny ||= Bunny.new(:host => @configuration[:host])
@bunny
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment