Last active
January 3, 2016 02:09
-
-
Save mpilone/8393588 to your computer and use it in GitHub Desktop.
MOVED: This plugin is now merged into Logstash core. -- A Throttle filter for LogStash. The filter can throttle the number of events received which is extremely useful when sending alerts via email.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "logstash/filters/base" | |
require "logstash/namespace" | |
# The throttle filter is for throttling the number of events received. The filter | |
# is configured with a lower bound, the before_count, and upper bound, the after_count, | |
# and a period of time. All events passing through the filter will be counted based on | |
# a key_field. As long as the count is less than the before_count or greater than the | |
# after_count, the event will be "throttled" which means the filter will be considered | |
# successful and any tags or fields will be added. | |
# | |
# For example, if you wanted to throttle events so you only receive an event after 2 | |
# occurrences and you get no more than 3 in 10 minutes, you would use the | |
# configuration: | |
# period => 600 | |
# before_count => 3 | |
# after_count => 5 | |
# | |
# Which would result in: | |
# event 1 - throttled (successful filter, period start) | |
# event 2 - throttled (successful filter) | |
# event 3 - not throttled | |
# event 4 - not throttled | |
# event 5 - not throttled | |
# event 6 - throttled (successful filter) | |
# event 7 - throttled (successful filter) | |
# event x - throttled (successful filter) | |
# period end | |
# event 1 - throttled (successful filter, period start) | |
# event 2 - throttled (successful filter) | |
# event 3 - not throttled | |
# event 4 - not throttled | |
# event 5 - not throttled | |
# event 6 - throttled (successful filter) | |
# ... | |
# | |
# Another example is if you wanted to throttle events so you only receive 1 event per | |
# hour, you would use the configuration: | |
# period => 3600 | |
# before_count => -1 | |
# after_count => 1 | |
# | |
# Which would result in: | |
# event 1 - not throttled (period start) | |
# event 2 - throttled (successful filter) | |
# event 3 - throttled (successful filter) | |
# event 4 - throttled (successful filter) | |
# event x - throttled (successful filter) | |
# period end | |
# event 1 - not throttled (period start) | |
# event 2 - throttled (successful filter) | |
# event 3 - throttled (successful filter) | |
# event 4 - throttled (successful filter) | |
# ... | |
# | |
# A common use case would be to use the checksum filter to generate a key_field | |
# from multiple fields, use the throttle filter to throttle events before 3 and after 5, | |
# and then use the drop filter to remove throttled events. This configuration might | |
# appear as: | |
# | |
# filter { | |
# checksum { | |
# keys => [ "host", "message" ] | |
# } | |
# throttle { | |
# before_count => 3 | |
# after_count => 5 | |
# period => 3600 | |
# key_field => "logstash_checksum" | |
# add_tag => "throttled" | |
# } | |
# if "throttled" in [tags] { | |
# drop { } | |
# } | |
# } | |
# | |
# Another case would be to store all events, but only email non-throttled | |
# events so the op's inbox isn't flooded with emails in the event of a system error. | |
# This configuration might appear as: | |
# | |
# filter { | |
# throttle { | |
# before_count => 3 | |
# after_count => 5 | |
# period => 3600 | |
# key_field => "message" | |
# add_tag => "throttled" | |
# } | |
# } | |
# output { | |
# if "throttled" not in [tags] { | |
# email { | |
# from => "[email protected]" | |
# subject => "Production System Alert" | |
# to => "[email protected]" | |
# via => "sendmail" | |
# body => "Alert on %{host} from path %{path}:\n\n%{message}" | |
# options => { "location" => "/usr/sbin/sendmail" } | |
# } | |
# } | |
# elasticsearch_http { | |
# host => "localhost" | |
# port => "19200" | |
# } | |
# } | |
# | |
# The event counts are cleared after the configured period elapses since the | |
# first instance of the event. That is, all the counts don't reset at the same | |
# time but rather the throttle period is per event/unique key_field value. | |
# | |
# Mike Pilone (@mikepilone) | |
# | |
class LogStash::Filters::Throttle < LogStash::Filters::Base | |
# The name to use in configuration files. | |
config_name "throttle" | |
# New plugins should start life at milestone 1. | |
milestone 1 | |
# The key field used to identify events. Two events with the same value in the | |
# key field will be considered the same event and will increment the count for | |
# that event. Refer to the checksum filter if support for multiple fields is needed. | |
config :key_field, :validate => :string, :required => true | |
# Events less than this count will be throttled. Setting this value to -1, the | |
# default, will cause no messages to be throttled based on the lower bound. | |
config :before_count, :validate => :number, :default => -1, :required => false | |
# Events greater than this count will be throttled. Setting this value to -1, the | |
# default, will cause no messages to be throttled based on the upper bound. | |
config :after_count, :validate => :number, :default => -1, :required => false | |
# The period in seconds after the first occurrence of an event until the count is | |
# reset for the event. This period is tracked per unique key_field value. | |
config :period, :validate => :number, :default => 3600, :required => false | |
# The maximum number of counters to store before the oldest counter is purged. Setting | |
# this value to -1 will prevent an upper bound no constraint on the number of counters | |
# and they will only be purged after expiration. This configuration value should only | |
# be used as a memory control mechanism and can cause early counter expiration if the | |
# value is reached. It is recommended to leave the default value and ensure that your | |
# key_field is selected such that it limits the number of counters required (i.e. don't | |
# use UUID as the key_field!) | |
config :max_counters, :validate => :number, :default => 100000, :required => false | |
# Performs initialization of the filter. | |
public | |
def register | |
@threadsafe = false | |
@eventCounters = Hash.new | |
@nextExpiration = nil | |
end # def register | |
# Filters the event. The filter is successful if the event should be throttled. | |
public | |
def filter(event) | |
# Return nothing unless there's an actual filter event | |
return unless filter?(event) | |
# Return nothing unless the event has the key field | |
return unless event.include?(@key_field) | |
now = Time.now | |
key = event[@key_field] | |
# Purge counters if too large to prevent OOM. | |
if @max_counters != -1 && @eventCounters.size > @max_counters then | |
purgeOldestEventCounter() | |
end | |
# Expire existing counter if needed | |
if @nextExpiration.nil? || now >= @nextExpiration then | |
expireEventCounters(now) | |
end | |
@logger.debug? and @logger.debug( | |
"filters/#{self.class.name}: next expiration", | |
{ "nextExpiration" => @nextExpiration }) | |
# Create new counter for this event if this is the first occurrence | |
counter = nil | |
if [email protected]?(key) then | |
expiration = now + @period | |
@eventCounters[key] = { :count => 0, :expiration => expiration } | |
@logger.debug? and @logger.debug("filters/#{self.class.name}: new event", | |
{ :key => key, :expiration => expiration }) | |
end | |
# Fetch the counter | |
counter = @eventCounters[key] | |
# Count this event | |
counter[:count] = counter[:count] + 1; | |
@logger.debug? and @logger.debug("filters/#{self.class.name}: current count", | |
{ :key => key, :count => counter[:count] }) | |
# Throttle if count is < before count or > after count | |
if ((@before_count != -1 && counter[:count] < @before_count) || | |
(@after_count != -1 && counter[:count] > @after_count)) then | |
@logger.debug? and @logger.debug( | |
"filters/#{self.class.name}: throttling event", { :key => key }) | |
filter_matched(event) | |
end | |
end # def filter | |
# Expires any counts where the period has elapsed. Sets the next expiration time | |
# for when this method should be called again. | |
private | |
def expireEventCounters(now) | |
@nextExpiration = nil | |
@eventCounters.delete_if { |key, counter| | |
expiration = counter[:expiration] | |
expired = expiration <= now | |
if expired then | |
@logger.debug? and @logger.debug( | |
"filters/#{self.class.name}: deleting expired counter", | |
{ :key => key }) | |
elsif @nextExpiration.nil? || (expiration < @nextExpiration) | |
@nextExpiration = expiration | |
end | |
expired | |
} | |
end # def expireEventCounters | |
# Purges the oldest event counter. This operation is for memory control only | |
# and can cause early period expiration and thrashing if invoked. | |
private | |
def purgeOldestEventCounter() | |
# Return unless we have something to purge | |
return unless @eventCounters.size > 0 | |
oldestCounter = nil | |
oldestKey = nil | |
@eventCounters.each { |key, counter| | |
if oldestCounter.nil? || counter[:expiration] < oldestCounter[:expiration] then | |
oldestKey = key; | |
oldestCounter = counter; | |
end | |
} | |
@logger.warn? and @logger.warn( | |
"filters/#{self.class.name}: Purging oldest counter because max_counters " + | |
"exceeded. Use a better key_field to prevent too many unique event counters.", | |
{ :key => oldestKey, :expiration => oldestCounter[:expiration] }) | |
@eventCounters.delete(oldestKey) | |
end | |
end # class LogStash::Filters::Throttle |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment