Skip to content

Instantly share code, notes, and snippets.

@sperlic
Forked from dannycoates/abuse.lua
Last active August 29, 2015 14:07
Show Gist options
  • Save sperlic/45c7164471e704ebde0d to your computer and use it in GitHub Desktop.
Save sperlic/45c7164471e704ebde0d to your computer and use it in GitHub Desktop.

These are a couple real examples of heka lua filters that do anomaly detection and alerting, which a both very new experimental features. Eventually we should have some nicer abstractions for detecting and alerting common scenarios that can be easily configured instead of having to write all the logic yourself. There's currently a lot of flexabilty but at the price of more code.

Since heka filters can process and generate messages, you can make pretty complex meta-dataflows to control alerts, and can wire them up to any output (I think), irc, email, sms, whatevs.

The module sources are here:

https://github.com/mozilla-services/heka/tree/dev/sandbox/lua/modules

Our current dashboard displays annotations, but the UX still needs a lot of work; there's too much clicking and navigating required. I'd ❤︎ to have some time to contribute.

-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
require "math"
require "string"
local alert = require "alert"
local message_variable = read_config("message_variable") or error("must specify a 'message_variable'")
local max_items = read_config("max_items") or 25000
local alert_min_count = read_config("alert_min_count") or 50
local alert_max_mean = read_config("alert_max_mean") or 1
local alert_throttle = read_config("alert_throttle") or 5 * 60 * 1e9
alert.set_throttle(alert_throttle)
local WEIGHT, TS, N, OM, NM, OS, NS, ALERT = 1, 2, 3, 4, 5, 6, 7, 8
local function running_stats(x, y)
y[N] = y[N] + 1
if y[N] == 1 then
y[OM], y[NM] = x, x
y[OS] = 0
else
y[NM] = y[OM] + (x - y[OM])/y[N]
y[NS] = y[OS] + (x - y[OM])*(x - y[NM])
y[OM] = y[NM]
y[OS] = y[NS]
end
end
items = {}
items_size = 0
active_day = 0
function process_message ()
local ts = read_message("Timestamp")
local item = read_message(message_variable)
if not item then return -1 end
local day = math.floor(ts / (60 * 60 * 24 * 1e9))
if day < active_day then
return 0 -- too old
elseif day > active_day then
active_day = day
items = {}
items_size = 0
end
local i = items[item]
if i then
if i[TS] ~= 0 then
if not i[N] then i[N] = 0 end
local x = ts - i[TS]
running_stats(x/1e9, i)
end
i[TS] = ts
i[WEIGHT] = i[WEIGHT] + 1
return 0
end
if items_size == max_items then
for k,v in pairs(items) do
local weight = v[WEIGHT]
if weight == 1 then
items[k] = nil
items_size = items_size - 1
else
v[WEIGHT] = weight - 1
end
end
else
i = {1, ts}
items[item] = i
items_size = items_size + 1
end
return 0
end
function timer_event(ns)
output(string.format("%s\tWeight\tCount\tMean\tSD\n", message_variable))
for k, v in pairs(items) do
if v[N] and v[N] >= alert_min_count then
local variance = v[NS]/(v[N]-1)
output(string.format("%s\t%d\t%d\t%G\t%G\n", k, v[WEIGHT], v[N], v[NM], math.sqrt(variance)))
if v[NM] <= alert_max_mean and not v[ALERT] then
v[ALERT] = alert.queue(ns, string.format("Abuse detected %s: %s", message_variable, k))
end
end
end
inject_message("tsv", "Statistics")
alert.send_queue(ns)
end
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
require "circular_buffer"
require "string"
local alert = require "alert"
local annotation = require "annotation"
local anomaly = require "anomaly"
local title = "Summary"
local rows = read_config("rows") or 1440
local sec_per_row = read_config("sec_per_row") or 60
local anomaly_config = anomaly.parse_config(read_config("anomaly_config"))
annotation.set_prune(title, rows * sec_per_row * 1e9)
data = circular_buffer.new(rows, 3, sec_per_row)
local SUCCESS = data:set_header(1, "Success")
local FAILURE = data:set_header(2, "Failure")
local PFAIL = data:set_header(3, "%Failure", "percent", "none")
function process_message ()
local ts = read_message("Timestamp")
local errno = read_message("Fields[errno]")
if errno == 0 then
local s = data:add(ts, SUCCESS, 1)
if not s then return 0 end
local f = data:get(ts, FAILURE)
if f and f == f then
local p = f / (f + s) * 100
data:set(ts, PFAIL, p)
else
data:set(ts, PFAIL, 0)
end
else
local f = data:add(ts, FAILURE, 1)
if not f then return 0 end
local s = data:get(ts, SUCCESS)
if s and s == s then
local p = f / (f + s) * 100
data:set(ts, PFAIL, p)
else
data:set(ts, PFAIL, 100)
end
end
return 0
end
function timer_event(ns)
if anomaly_config then
if not alert.throttled(ns) then
local msg, annos = anomaly.detect(ns, title, data, anomaly_config)
if msg then
annotation.concat(title, annos)
alert.send(ns, msg)
end
end
output({annotations = annotation.prune(title, ns)}, data)
inject_message("cbuf", title)
else
inject_message(data:format("cbuf"), title)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment