Created
June 11, 2012 15:28
-
-
Save jonleighton/2910652 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Example code provided for https://github.com/librato/librato-metrics/issues/14 | |
| # | |
| # We configured it like so: | |
| # | |
| # Loco2::Metrics.queue_constructor = Librato::Metrics::Queue.method(:new) | |
| # Loco2::Metrics.logger = Rails.logger | |
| # Loco2::Metrics.exception_handler = Airbrake.method(:notify) | |
| require 'thread' | |
| require 'timeout' | |
| require 'active_support/core_ext/module/attribute_accessors' | |
| module Loco2 | |
| module Metrics | |
| # How often (in seconds) we should submit metrics | |
| mattr_accessor :submit_interval | |
| self.submit_interval = 60 | |
| mattr_accessor :queue_constructor | |
| mattr_accessor :logger | |
| mattr_accessor :exception_handler | |
| # The aggregator is created lazily so we can be sure that it exists in the same | |
| # process as where a given metric is submitted. This is important because otherwise | |
| # when the workers are started, the aggregator would be created but then the | |
| # workers are forked/daemonized and the work is actually done in a separate | |
| # memory space. | |
| def self.queue | |
| @queue ||= begin | |
| queue = Queue.new(queue_constructor, :logger => logger, :exception_handler => exception_handler) | |
| queue.submit_every(submit_interval) | |
| # When we exit the process, try to submit any outstanding metrics, but | |
| # fail silently if it takes longer than 2 seconds | |
| at_exit do | |
| begin | |
| Timeout.timeout(2) { queue.submit } | |
| rescue Timeout::Error | |
| end | |
| end | |
| queue | |
| end | |
| end | |
| # This allows us to add to a Queue in one thread and submit it in another thread. | |
| # Before submitting, we will create a new aggregator instance and use that for all future | |
| # additions. This allows us to avoid having to put a lock around the submit operation, | |
| # which could potentially be long-running thus blocking all threads. | |
| class Queue | |
| attr_reader :options | |
| def initialize(constructor, options = {}) | |
| @constructor = constructor | |
| @options = options | |
| @mutex = Mutex.new | |
| @queue = @constructor.call | |
| end | |
| def logger | |
| options[:logger] | |
| end | |
| def exception_handler | |
| options[:exception_handler] || proc { } | |
| end | |
| def add(*args) | |
| @mutex.synchronize { @queue.add(*args) } | |
| end | |
| def submit | |
| old = nil | |
| @mutex.synchronize do | |
| old = @queue | |
| @queue = @constructor.call | |
| end | |
| logger.info "[Loco2::Metrics::Aggregator] Submitting: #{old.queued.inspect}" if logger | |
| old.submit unless old.empty? | |
| end | |
| def submit_every(seconds) | |
| thread do | |
| loop do | |
| sleep seconds | |
| thread { submit } | |
| end | |
| end | |
| end | |
| private | |
| def thread | |
| Thread.new do | |
| begin | |
| yield | |
| rescue => e | |
| exception_handler.call(e) | |
| end | |
| end | |
| end | |
| end | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment