Last active
December 18, 2017 21:47
-
-
Save eprothro/02b1680fb36e89d4323ff31b105e6c7a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Stub for a logstash filter event | |
class Event | |
def initialize(initial_data=nil) | |
@data = initial_data || {} | |
end | |
def get(key) | |
@data[key] | |
end | |
def set(key, val) | |
@data[key] = val | |
end | |
def inspect | |
@data.inspect | |
end | |
end | |
# simulate an event coming from a logstash input to the filter | |
event = Event.new | |
event.set("host", "some_host") | |
# first document | |
if rand(100) < 50 | |
value = Time.now.to_i * 5 | |
event.set("metric_path", "Cassandra.Write.total_latency.count") | |
event.set("metric_value_number", value) | |
else | |
# second document | |
value = Time.now.to_i | |
event.set("metric_path", "Cassandra.Write.total.count") | |
event.set("metric_value_number", value) | |
end | |
### logstash ruby filter | |
# ruby { | |
# init => ' | |
require 'yaml' | |
# ' | |
# code => ' | |
host = event.get("host") | |
scope = "write" | |
data_path = "/tmp/#{host}_jmx_#{scope}_logstash_filter.yml" | |
lock_path = "/tmp/#{host}_jmx_#{scope}_logstash_filter.lock" | |
data_file = File.new(data_path, "a+") | |
lock_file = File.new(lock_path, File::RDWR|File::CREAT) | |
begin | |
# acquire lock | |
# only allow one logstash worker to enter at a time | |
# per host scope, to avoid race conditions when there | |
# are multiple workers reading documents simultaneously | |
lock_file.flock(File::LOCK_EX) | |
data = YAML.load(data_file.read) | |
data ||= {"current" => {}, "previous" => {}} | |
reset_data = false | |
puts "event: #{event}" | |
puts "data from file: #{data}" | |
# set whichever new JMX metric we have in this event | |
event_type = event.get("metric_path") | |
event_value = event.get("metric_value_number") | |
event_total_latency = nil | |
event_total_request_count = nil | |
case event_type | |
when "Cassandra.Write.total_latency.count" | |
event_total_latency = event_value | |
when "Cassandra.Write.total.count" | |
event_total_request_count = event_value | |
else | |
raise "unexpected event type: #{event_type}" | |
end | |
data["current"]["total_latency"] = event_total_latency if event_total_latency | |
data["current"]["total_request_count"] = event_total_request_count if event_total_request_count | |
if data["current"]["ready"] == true | |
# someone else got here first and wrote the other | |
# current metric -- we are ready to perform calculation | |
# and shift current into previous | |
if data["previous"] && data["previous"].keys.count > 0 | |
request_count = data["current"]["total_request_count"] | |
previous_request_count = data["previous"]["total_request_count"] | |
total_latency = data["current"]["total_latency"] | |
previous_total_latency = data["previous"]["total_latency"] | |
if request_count && previous_request_count && total_latency && previous_total_latency | |
request_count = request_count - previous_request_count | |
if request_count > 0 | |
avg_latency = (total_latency - previous_total_latency) / request_count.to_f | |
# convert metric unit of usec to msec | |
avg_latency = avg_latency / 1000.0 | |
# add calculated fields | |
event.set("avg_latency", avg_latency) | |
event.set("requests", request_count) | |
end | |
else | |
# if we don't have all expected values | |
# we're out of sync and need to start over | |
reset_data = true | |
tags = event.get("tags") || [] | |
event.set("tags", tags << "_jmx_metric_order_failure") | |
end | |
else | |
# first run, there is no previous data | |
# to create calculateion; carry on. | |
puts "no previous data, continuing..." | |
end | |
data["previous"] = data["current"] | |
data["current"] = {} | |
else | |
# we are the first one to get here and will populate | |
# our value for the next event to perform the calculation | |
data["current"]["ready"] = true | |
end | |
if reset_data | |
data = {"current" => {}, "previous" => {}} | |
end | |
puts "data to file: #{data}" | |
data_file.rewind | |
data_file.truncate(0) | |
data_file.write(YAML.dump(data)) | |
data_file.close | |
ensure | |
# release lock | |
lock_file.flock(File::LOCK_UN) | |
end | |
# ' | |
# } | |
### ------------------------------ | |
# print to console what would make it to elastic | |
puts "Logstash event for elastic:" | |
puts event.inspect |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment