Last active
August 29, 2015 14:16
-
-
Save totem3/5ea60435cd140ffbf519 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
class Host | |
attr_reader *%i(name ip tags reported tn tmax dmax location gmond_started metrics) | |
def initialize(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics) | |
@name = name | |
@ip = ip | |
@tags = tags | |
@reported = reported | |
@tn = tn | |
@tmax = tmax | |
@dmax = dmax | |
@location = location | |
@gmond_started = gmond_started | |
@metrics = metrics | |
end | |
def to_s | |
"name = #{self.name}, ip = #{self.ip}, tags = #{self.tags}, reported = #{self.reported}, tn = #{self.tn}, tmax = #{self.tmax}, dmax = #{self.dmax}, location = #{self.location}, gmond_started = #{self.gmond_started}, metrics = #{self.metrics}" | |
end | |
end | |
class Metric | |
attr_reader *%i(name val type units tn tmax dmax slope source group desc title) | |
def initialize(name, val, type, units, tn, tmax, dmax, slope, source, group, desc, title) | |
@name = name | |
@val = val | |
@type = type | |
@units = units | |
@tn = tn | |
@tmax = tmax | |
@dmax = dmax | |
@slope = slope | |
@source = source | |
@group = group | |
@desc = desc | |
@title = title | |
end | |
end | |
class GangliaParser | |
def initialize(on_message) | |
require 'rexml/document' | |
@on_message = on_message | |
end | |
def call(input) | |
xml = REXML::Document.new(input) | |
hosts = REXML::XPath.match(xml, "/GANGLIA_XML/CLUSTER/HOST").map do |h| | |
name = h.attribute("NAME").value | |
ip = h.attribute("IP").value | |
tags = h.attribute("TAGS").value | |
reported = h.attribute("REPORTED").value | |
tn = h.attribute("TN").value | |
tmax = h.attribute("TMAX").value | |
dmax = h.attribute("DMAX").value | |
location = h.attribute("LOCATION").value | |
gmond_started = h.attribute("GMOND_STARTED").value | |
metrics = h.get_elements("METRIC").map do |m| | |
metric_name = m.attribute("NAME").value | |
metric_val = m.attribute("VAL").value | |
metric_type = m.attribute("TYPE").value | |
metric_units = m.attribute("UNITS") ? m.attribute("UNITS").value : nil | |
metric_tn = m.attribute("TN") ? m.attribute("TN").value : nil | |
metric_tmax = m.attribute("TMAX") ? m.attribute("TMAX").value : nil | |
metric_dmax = m.attribute("DMAX") ? m.attribute("DMAX").value : nil | |
metric_slope = m.attribute("SLOPE") ? m.attribute("SLOPE").value : nil | |
metric_source = m.attribute("SOURCE") ? m.attribute("SOURCE").value : nil | |
extra_data = m.get_elements("EXTRA_DATA").first.get_elements("EXTRA_ELEMENT").map {|e| [e.attribute("NAME").value, e.attribute("VAL").value]}.to_h | |
Metric.new(metric_name, metric_val, metric_type, metric_units, metric_tn, metric_tmax, metric_dmax, metric_slope, metric_source, extra_data["GROUP"], extra_data["DESC"], extra_data["TITLE"]) | |
end | |
Host.new(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics) | |
end | |
hosts.each do |h| | |
h.metrics.each do |e| | |
data = {hostname: h.name, metric_name: e.name, value: e.val}.to_json | |
@on_message.call(data) | |
end unless h.metrics.nil? | |
end | |
end | |
end | |
module Fluent | |
class GangliaInput < Input | |
Plugin.register_input('ganglia', self) | |
def initialize | |
super | |
require 'fluent/timezone' | |
require 'socket' | |
require 'open3' | |
require 'json' | |
end | |
config_param :tag, :string, :default => nil | |
config_param :run_interval, :time, :default => nil | |
config_param :gmond_host, :string, :default => "localhost" | |
config_param :gmond_port, :string, :default => "8649" | |
def configure(conf) | |
super | |
if localtime = conf['localtime'] | |
@localtime = true | |
elsif utc = conf['utc'] | |
@localtime = false | |
end | |
if conf['timezone'] | |
@timezone = conf['timezone'] | |
Fluent::Timezone.validate!(@timezone) | |
end | |
if !@tag or !@run_interval | |
raise ConfigError, "'tag' and 'run_interval' option is required on ganglia input" | |
end | |
@parser = GangliaParser.new(method(:on_message)) | |
end | |
def start | |
@finished = false | |
@thread = Thread.new(&method(:run_periodic)) | |
end | |
def shutdown | |
@finished = true | |
@thread.join | |
end | |
def run_periodic | |
until @finished | |
begin | |
sleep @run_interval | |
sock = TCPSocket.open(@gmond_host, @gmond_port) | |
input = [] | |
begin | |
while( (data = sock.recv_nonblock(100)) != "") | |
input << data | |
end | |
rescue Errno::EAGAIN | |
ensure | |
sock.close | |
end | |
@parser.call(input.join) | |
rescue | |
log.error "ganglia failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s | |
log.warn_backtrace $!.backtrace | |
end | |
end | |
end | |
private | |
def on_message(record) | |
tag = @tag | |
time = Engine.now | |
router.emit(tag, time, record) | |
rescue => e | |
log.error "ganglia failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record) | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment