Skip to content

Instantly share code, notes, and snippets.

@ricardotealdi
Last active June 14, 2016 14:39
Show Gist options
  • Save ricardotealdi/c318eb84865f98fa4c61ce6b2a7f7e7b to your computer and use it in GitHub Desktop.
Save ricardotealdi/c318eb84865f98fa4c61ce6b2a7f7e7b to your computer and use it in GitHub Desktop.
Periodic reporter with InfluxDB on Ruby
require 'influxdb'
require 'active_support/core_ext/hash/indifferent_access'
require 'active_support/core_ext/hash/deep_merge'
require 'yaml'
require 'thread'
class Registry
def initialize(options = {})
@mutex = Mutex.new
@metrics = {}
@logger = options[:logger]
end
def metrics
mutex.synchronize { @metrics.dup }
end
def gauge(measurement, options = {}, &metric_block)
mutex.synchronize do
log("Registering #{measurement}")
@metrics[measurement] = [
metric_block, options.fetch(:tags, {}),
options[:precision], options[:retention]
]
end
true
end
private
attr_reader :mutex, :logger
def log(message, severity = :debug)
return if logger.nil?
logger.public_send(
severity,
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
)
end
end
class Agent
HOST = Socket.gethostname.freeze
DEFAULT_TAGS = { host: HOST }.freeze
DEFAULT_INTERVAL = 5
def initialize(client, registry, app_name, options = {})
@client = client
@registry = registry
@default_data = { tags: DEFAULT_TAGS.merge(app_name: app_name) }
@interval = options.fetch(:interval, DEFAULT_INTERVAL)
@logger = options[:logger]
@thread = nil
end
def write(measurement, data, *args)
client.write_point(measurement, default_data.deep_merge(data), *args)
end
def start
return if alive?
log('Starting...')
@thread = Thread.new do
loop do
begin
log("sleeping for #{interval}s...")
sleep(interval)
Thread.new do
metrics.each do |metric_name, (metric, tags, precision, retention)|
begin
log("[#{metric_name}] Writing data...")
data = {
values: { value: metric.call }, tags: tags,
timestamp: Time.now.utc.to_i
}
write(metric_name, data, precision, retention)
rescue Exception => e
log(
"[#{metric_name}] ERROR: #{e.class}: #{e.message}", :error
)
end
end
end.join
rescue => e
log("ERROR: #{e.class}: #{e.message}", :error)
end
end
end
end
def alive?
@thread && @thread.alive?
end
def stop
log('Stopping...')
@thread.kill if @thread
@thread = nil
end
def restart
start unless alive?
end
private
attr_reader :client, :logger, :interval, :registry, :default_data
def metrics
registry.metrics
end
def log(message, severity = :debug)
return if logger.nil?
logger.public_send(
severity,
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
)
end
end
class Supervisor
INTERVAL_CHECK = 10
def initialize(agent, options = {})
@agent = agent
@logger = options[:logger]
@thread = nil
end
def start
unless @thread && @thread.alive?
log('Starting...')
@thread = Thread.new do
loop { sleep(INTERVAL_CHECK); agent.restart }
end
end
end
def stop
log('Stopping...')
@thread.kill if @thread
@thread = nil
end
private
attr_reader :agent, :logger
def log(message, severity = :debug)
return if logger.nil?
logger.public_send(
severity,
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}"
)
end
end
class Configuration
APP_NAME_KEY = 'REPORTER_APP_NAME'
INTERVAL_KEY = 'REPORTER_INTERVAL'
LOG_FILE_KEY = 'REPORTER_LOG_FILE'
DB_HOST_KEY = 'REPORTER_DB_HOST'
DB_PORT_KEY = 'REPORTER_DB_PORT'
DB_DATABASE_KEY = 'REPORTER_DB_DATABASE'
DB_USERNAME_KEY = 'REPORTER_DB_USERNAME'
DB_PASSWORD_KEY = 'REPORTER_DB_PASSWORD'
DB_ASYNC_KEY = 'REPORTER_DB_ASYNC'
attr_reader :db, :interval, :app_name, :logger
def self.default; @configuration ||= new; end
def initialize(config_path = 'config/reporter.yml')
@content = YAML.load(File.read(config_path)).with_indifferent_access
@db = build_db_info.freeze
@interval = (ENV[INTERVAL_KEY] || @content[:interval] || '5').to_i
@app_name = (
ENV[APP_NAME_KEY] ||
@content[:app_name] ||
fail('App name not provided')
).freeze
@logger = Logger.new(
ENV[LOG_FILE_KEY] || @content[:log_file] || STDOUT
)
freeze
end
private
attr_reader :content
def build_db_info
@content.fetch(:db, {}).tap do |db_hash|
db_hash[:host] = ENV[DB_HOST_KEY] if ENV[DB_HOST_KEY]
db_hash[:port] = ENV[DB_PORT_KEY] if ENV[DB_PORT_KEY]
db_hash[:database] = ENV[DB_DATABASE_KEY] if ENV[DB_DATABASE_KEY]
db_hash[:username] = ENV[DB_USERNAME_KEY] if ENV[DB_USERNAME_KEY]
db_hash[:password] = ENV[DB_PASSWORD_KEY] if ENV[DB_PASSWORD_KEY]
db_hash[:async] = ENV[DB_ASYNC_KEY] if ENV[DB_ASYNC_KEY]
end
end
end
class Reporter
extend Forwardable
def_delegator :registry, :gauge
def_delegator :client, :write
def initialize(options = {}, config = Configuration.default)
@app_name = options.fetch(:app_name, config.app_name)
@logger = options.fetch(:logger, config.logger)
@registry = options.fetch(:registry, Registry.new(logger: logger))
@interval = options.fetch(:interval, config.interval)
@client = ::InfluxDB::Client.new(options.fetch(:db, config.db))
@agent = Agent.new(
client, registry, app_name, interval: interval, logger: logger
)
@supervisor = Supervisor.new(agent, logger: logger)
end
def start
agent.start
supervisor.start
end
def stop
supervisor.stop
agent.stop
end
private
attr_reader(
:client, :logger, :interval, :agent, :registry, :supervisor, :app_name
)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment