Created
July 18, 2010 18:05
-
-
Save paukul/480580 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
# | |
# By default the logs are routed to the host, exchange and key specified in DEFAULT_OPTIONS | |
# Change the configuration when creating logger with an argument hash: | |
# | |
# logging_config = { :routing_key => "applogging", | |
# :host => AppConfig.amqp_logging.host, | |
# :exchange => AppConfig.amqp_logging.exchange } | |
# | |
# logger = AMQPLogging::Logger.new(config.log_path, logging_config) | |
# config.logger = logger | |
# | |
# You can set the routing key with a string or use a generator that respondes to call, | |
# receives the logline as the first argument and returns the routing key. | |
# | |
# Examples: | |
# AMQPRoutingKeyGenerator = lambda do |logline| | |
# if logline =~ /(?:engine\[([^\]]*)\])\: (Completed in|Processing|Session ID)?/ | |
# key = "logs.app.#{$1}" | |
# key << ".statistics" unless $2.nil? | |
# else | |
# key = "logs.app.system" | |
# end | |
# key | |
# end | |
# | |
# AMQPLogging::Logger.new(config.log_path, :routing_key => AMQPRoutingKeyGenerator) | |
# | |
module AMQPLogging | |
DEFAULT_OPTIONS = { | |
:shift_age => 0, | |
:shift_size => 1048576, | |
:host => "localhost", | |
:exchange => "logging_exchange", | |
:routing_key => "logs" | |
} | |
RETRY_AFTER = 10.seconds | |
class Logger < ::Logger | |
attr_accessor :extra_attributes | |
def initialize(logdev, *args) | |
options = args.first.is_a?(Hash) ? DEFAULT_OPTIONS.merge(args.first) : DEFAULT_OPTIONS | |
super(logdev, options[:shift_age], options[:shift_size]) | |
@logdev = AMQPLogDevice.new(@logdev, options) | |
end | |
end | |
class AMQPLogDevice | |
attr_reader :exchange, :configuration | |
def initialize(dev, opts = {}) | |
@configuration = opts | |
@fallback_logdev = dev | |
end | |
def write(msg) | |
begin | |
if @paused.nil? || @paused <= RETRY_AFTER.ago | |
routing_key = configuration[:routing_key].respond_to?(:call) ? configuration[:routing_key].call(msg).to_s : configuration[:routing_key] | |
exchange.publish(msg, :key => routing_key) | |
end | |
rescue Exception => exception | |
pause_amqp_logging(exception) | |
ensure | |
@fallback_logdev.write(msg) | |
end | |
end | |
def close | |
@fallback_logdev.close | |
end | |
private | |
def pause_amqp_logging(exception) | |
@paused = Time.now | |
reset_amqp | |
# do whatever you need to do, we in our case send a notification | |
end | |
def reset_amqp | |
begin | |
bunny.stop if bunny.connected? | |
rescue | |
# if bunny throws an exception here, its not usable anymore anyway | |
ensure | |
@exchange = @bunny = nil | |
end | |
end | |
def exchange | |
bunny.start unless bunny.connected? | |
@exchange ||= bunny.exchange(configuration[:exchange], :type => :topic) | |
end | |
def bunny | |
@bunny ||= Bunny.new(:host => @configuration[:host]) | |
@bunny | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment