Skip to content

Instantly share code, notes, and snippets.

@jonleighton
Created June 11, 2012 15:28
Show Gist options
  • Select an option

  • Save jonleighton/2910652 to your computer and use it in GitHub Desktop.

Select an option

Save jonleighton/2910652 to your computer and use it in GitHub Desktop.
# Example code provided for https://github.com/librato/librato-metrics/issues/14
#
# We configured it like so:
#
# Loco2::Metrics.queue_constructor = Librato::Metrics::Queue.method(:new)
# Loco2::Metrics.logger = Rails.logger
# Loco2::Metrics.exception_handler = Airbrake.method(:notify)
require 'thread'
require 'timeout'
require 'active_support/core_ext/module/attribute_accessors'
module Loco2
module Metrics
# How often (in seconds) we should submit metrics
mattr_accessor :submit_interval
self.submit_interval = 60
mattr_accessor :queue_constructor
mattr_accessor :logger
mattr_accessor :exception_handler
# The aggregator is created lazily so we can be sure that it exists in the same
# process as where a given metric is submitted. This is important because otherwise
# when the workers are started, the aggregator would be created but then the
# workers are forked/daemonized and the work is actually done in a separate
# memory space.
def self.queue
@queue ||= begin
queue = Queue.new(queue_constructor, :logger => logger, :exception_handler => exception_handler)
queue.submit_every(submit_interval)
# When we exit the process, try to submit any outstanding metrics, but
# fail silently if it takes longer than 2 seconds
at_exit do
begin
Timeout.timeout(2) { queue.submit }
rescue Timeout::Error
end
end
queue
end
end
# This allows us to add to a Queue in one thread and submit it in another thread.
# Before submitting, we will create a new aggregator instance and use that for all future
# additions. This allows us to avoid having to put a lock around the submit operation,
# which could potentially be long-running thus blocking all threads.
class Queue
attr_reader :options
def initialize(constructor, options = {})
@constructor = constructor
@options = options
@mutex = Mutex.new
@queue = @constructor.call
end
def logger
options[:logger]
end
def exception_handler
options[:exception_handler] || proc { }
end
def add(*args)
@mutex.synchronize { @queue.add(*args) }
end
def submit
old = nil
@mutex.synchronize do
old = @queue
@queue = @constructor.call
end
logger.info "[Loco2::Metrics::Aggregator] Submitting: #{old.queued.inspect}" if logger
old.submit unless old.empty?
end
def submit_every(seconds)
thread do
loop do
sleep seconds
thread { submit }
end
end
end
private
def thread
Thread.new do
begin
yield
rescue => e
exception_handler.call(e)
end
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment