Last active
June 14, 2016 14:39
-
-
Save ricardotealdi/c318eb84865f98fa4c61ce6b2a7f7e7b to your computer and use it in GitHub Desktop.
Periodic reporter with InfluxDB on Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'influxdb' | |
require 'active_support/core_ext/hash/indifferent_access' | |
require 'active_support/core_ext/hash/deep_merge' | |
require 'yaml' | |
require 'thread' | |
class Registry | |
def initialize(options = {}) | |
@mutex = Mutex.new | |
@metrics = {} | |
@logger = options[:logger] | |
end | |
def metrics | |
mutex.synchronize { @metrics.dup } | |
end | |
def gauge(measurement, options = {}, &metric_block) | |
mutex.synchronize do | |
log("Registering #{measurement}") | |
@metrics[measurement] = [ | |
metric_block, options.fetch(:tags, {}), | |
options[:precision], options[:retention] | |
] | |
end | |
true | |
end | |
private | |
attr_reader :mutex, :logger | |
def log(message, severity = :debug) | |
return if logger.nil? | |
logger.public_send( | |
severity, | |
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}" | |
) | |
end | |
end | |
class Agent | |
HOST = Socket.gethostname.freeze | |
DEFAULT_TAGS = { host: HOST }.freeze | |
DEFAULT_INTERVAL = 5 | |
def initialize(client, registry, app_name, options = {}) | |
@client = client | |
@registry = registry | |
@default_data = { tags: DEFAULT_TAGS.merge(app_name: app_name) } | |
@interval = options.fetch(:interval, DEFAULT_INTERVAL) | |
@logger = options[:logger] | |
@thread = nil | |
end | |
def write(measurement, data, *args) | |
client.write_point(measurement, default_data.deep_merge(data), *args) | |
end | |
def start | |
return if alive? | |
log('Starting...') | |
@thread = Thread.new do | |
loop do | |
begin | |
log("sleeping for #{interval}s...") | |
sleep(interval) | |
Thread.new do | |
metrics.each do |metric_name, (metric, tags, precision, retention)| | |
begin | |
log("[#{metric_name}] Writing data...") | |
data = { | |
values: { value: metric.call }, tags: tags, | |
timestamp: Time.now.utc.to_i | |
} | |
write(metric_name, data, precision, retention) | |
rescue Exception => e | |
log( | |
"[#{metric_name}] ERROR: #{e.class}: #{e.message}", :error | |
) | |
end | |
end | |
end.join | |
rescue => e | |
log("ERROR: #{e.class}: #{e.message}", :error) | |
end | |
end | |
end | |
end | |
def alive? | |
@thread && @thread.alive? | |
end | |
def stop | |
log('Stopping...') | |
@thread.kill if @thread | |
@thread = nil | |
end | |
def restart | |
start unless alive? | |
end | |
private | |
attr_reader :client, :logger, :interval, :registry, :default_data | |
def metrics | |
registry.metrics | |
end | |
def log(message, severity = :debug) | |
return if logger.nil? | |
logger.public_send( | |
severity, | |
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}" | |
) | |
end | |
end | |
class Supervisor | |
INTERVAL_CHECK = 10 | |
def initialize(agent, options = {}) | |
@agent = agent | |
@logger = options[:logger] | |
@thread = nil | |
end | |
def start | |
unless @thread && @thread.alive? | |
log('Starting...') | |
@thread = Thread.new do | |
loop { sleep(INTERVAL_CHECK); agent.restart } | |
end | |
end | |
end | |
def stop | |
log('Stopping...') | |
@thread.kill if @thread | |
@thread = nil | |
end | |
private | |
attr_reader :agent, :logger | |
def log(message, severity = :debug) | |
return if logger.nil? | |
logger.public_send( | |
severity, | |
"[#{self.class}][#{Thread.current.object_id.to_s(16)}] #{message}" | |
) | |
end | |
end | |
class Configuration | |
APP_NAME_KEY = 'REPORTER_APP_NAME' | |
INTERVAL_KEY = 'REPORTER_INTERVAL' | |
LOG_FILE_KEY = 'REPORTER_LOG_FILE' | |
DB_HOST_KEY = 'REPORTER_DB_HOST' | |
DB_PORT_KEY = 'REPORTER_DB_PORT' | |
DB_DATABASE_KEY = 'REPORTER_DB_DATABASE' | |
DB_USERNAME_KEY = 'REPORTER_DB_USERNAME' | |
DB_PASSWORD_KEY = 'REPORTER_DB_PASSWORD' | |
DB_ASYNC_KEY = 'REPORTER_DB_ASYNC' | |
attr_reader :db, :interval, :app_name, :logger | |
def self.default; @configuration ||= new; end | |
def initialize(config_path = 'config/reporter.yml') | |
@content = YAML.load(File.read(config_path)).with_indifferent_access | |
@db = build_db_info.freeze | |
@interval = (ENV[INTERVAL_KEY] || @content[:interval] || '5').to_i | |
@app_name = ( | |
ENV[APP_NAME_KEY] || | |
@content[:app_name] || | |
fail('App name not provided') | |
).freeze | |
@logger = Logger.new( | |
ENV[LOG_FILE_KEY] || @content[:log_file] || STDOUT | |
) | |
freeze | |
end | |
private | |
attr_reader :content | |
def build_db_info | |
@content.fetch(:db, {}).tap do |db_hash| | |
db_hash[:host] = ENV[DB_HOST_KEY] if ENV[DB_HOST_KEY] | |
db_hash[:port] = ENV[DB_PORT_KEY] if ENV[DB_PORT_KEY] | |
db_hash[:database] = ENV[DB_DATABASE_KEY] if ENV[DB_DATABASE_KEY] | |
db_hash[:username] = ENV[DB_USERNAME_KEY] if ENV[DB_USERNAME_KEY] | |
db_hash[:password] = ENV[DB_PASSWORD_KEY] if ENV[DB_PASSWORD_KEY] | |
db_hash[:async] = ENV[DB_ASYNC_KEY] if ENV[DB_ASYNC_KEY] | |
end | |
end | |
end | |
class Reporter | |
extend Forwardable | |
def_delegator :registry, :gauge | |
def_delegator :client, :write | |
def initialize(options = {}, config = Configuration.default) | |
@app_name = options.fetch(:app_name, config.app_name) | |
@logger = options.fetch(:logger, config.logger) | |
@registry = options.fetch(:registry, Registry.new(logger: logger)) | |
@interval = options.fetch(:interval, config.interval) | |
@client = ::InfluxDB::Client.new(options.fetch(:db, config.db)) | |
@agent = Agent.new( | |
client, registry, app_name, interval: interval, logger: logger | |
) | |
@supervisor = Supervisor.new(agent, logger: logger) | |
end | |
def start | |
agent.start | |
supervisor.start | |
end | |
def stop | |
supervisor.stop | |
agent.stop | |
end | |
private | |
attr_reader( | |
:client, :logger, :interval, :agent, :registry, :supervisor, :app_name | |
) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment