-
-
Save asenchi/e6268a83bf8007c9ec94 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- This Source Code Form is subject to the terms of the Mozilla Public | |
-- License, v. 2.0. If a copy of the MPL was not distributed with this | |
-- file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
--[[ | |
Graphs the Cpu Load and number of processes of the system running heka. | |
Config: | |
- sec_per_row (uint, optional, default 60) | |
Sets the size of each bucket (resolution in seconds) in the sliding window. | |
- rows (uint, optional, default 1440) | |
Sets the size of the sliding window i.e., 1440 rows representing 60 seconds | |
per row is a 24 sliding hour window with 1 minute resolution. | |
- anomaly_config(string) - (see :ref:`sandbox_anomaly_module`) | |
*Example Heka Configuration* | |
.. code-block:: ini | |
[CpuStatsFilter] | |
type = "SandboxFilter" | |
filename = "lua_filters/cpustats.lua" | |
ticker_interval = 60 | |
preserve_data = true | |
message_matcher = "Type == 'stats.cpustats'" | |
--]] | |
require "circular_buffer" | |
require "string" | |
local alert = require "alert" | |
local annotation = require "annotation" | |
local anomaly = require "anomaly" | |
local title = "Cpu Stats" | |
local rows = read_config("rows") or 1440 | |
local sec_per_row = read_config("sec_per_row") or 60 | |
local anomaly_config = anomaly.parse_config(read_config("anomaly_config")) | |
annotation.set_prune(title, rows * sec_per_row * 1e9) | |
local field_names = {"1MinAvg", "5MinAvg", "15MinAvg", "NumProcesses"} | |
cbuf = circular_buffer.new(rows, #field_names, sec_per_row) | |
local labels = {} | |
for i, name in ipairs(field_names) do | |
labels[#labels+1] = string.format("Fields[%s]", name) | |
cbuf:set_header(i, name, "Count", "max") | |
end | |
function process_message () | |
local ts = read_message("Timestamp") | |
for i, name in ipairs(field_names) do | |
cbuf:set(ts, i, read_message(labels[i])) | |
end | |
return 0 | |
end | |
function timer_event(ns) | |
if anomaly_config then | |
if not alert.throttled(ns) then | |
local msg, annos = anomaly.detect(ns, title, cbuf, anomaly_config) | |
if msg then | |
annotation.concat(title, annos) | |
alert.send(ns, msg) | |
end | |
end | |
inject_payload("cbuf", title, annotation.prune(title, ns), cbuf) | |
else | |
inject_payload("cbuf", title, cbuf) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package plugins | |
import ( | |
"fmt" | |
"github.com/mozilla-services/heka/message" | |
"github.com/mozilla-services/heka/pipeline" | |
ts "github.com/mozilla-services/heka/pipeline/testsupport" | |
pm "github.com/mozilla-services/heka/pipelinemock" | |
"github.com/mozilla-services/heka/sandbox" | |
"github.com/rafrombrc/gomock/gomock" | |
gs "github.com/rafrombrc/gospec/src/gospec" | |
"os" | |
"path/filepath" | |
"time" | |
) | |
type FilterTestHelper struct { | |
MockHelper *pm.MockPluginHelper | |
MockFilterRunner *pm.MockFilterRunner | |
} | |
func NewFilterTestHelper(ctrl *gomock.Controller) (fth *FilterTestHelper) { | |
fth = new(FilterTestHelper) | |
fth.MockHelper = pm.NewMockPluginHelper(ctrl) | |
fth.MockFilterRunner = pm.NewMockFilterRunner(ctrl) | |
return | |
} | |
func FilterSpec(c gs.Context) { | |
t := new(ts.SimpleT) | |
ctrl := gomock.NewController(t) | |
defer ctrl.Finish() | |
fth := NewFilterTestHelper(ctrl) | |
inChan := make(chan *pipeline.PipelinePack, 1) | |
pConfig := pipeline.NewPipelineConfig(nil) | |
c.Specify("A SandboxFilter", func() { | |
sbFilter := new(SandboxFilter) | |
sbFilter.SetPipelineConfig(pConfig) | |
config := sbFilter.ConfigStruct().(*sandbox.SandboxConfig) | |
config.MemoryLimit = 32000 | |
config.InstructionLimit = 1000 | |
config.OutputLimit = 1024 | |
msg := getTestMessage() | |
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan()) | |
pack.Message = msg | |
pack.Decoded = true | |
c.Specify("Uninitialized", func() { | |
err := sbFilter.ReportMsg(msg) | |
c.Expect(err, gs.IsNil) | |
}) | |
c.Specify("Over inject messages from ProcessMessage", func() { | |
var timer <-chan time.Time | |
fth.MockFilterRunner.EXPECT().Ticker().Return(timer) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
fth.MockFilterRunner.EXPECT().Name().Return("processinject").Times(3) | |
fth.MockFilterRunner.EXPECT().Inject(pack).Return(true).Times(2) | |
fth.MockHelper.EXPECT().PipelineConfig().Return(pConfig) | |
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(2) | |
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("exceeded InjectMessage count")) | |
config.ScriptFilename = "../lua/testsupport/processinject.lua" | |
err := sbFilter.Init(config) | |
c.Assume(err, gs.IsNil) | |
inChan <- pack | |
close(inChan) | |
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
}) | |
c.Specify("Over inject messages from TimerEvent", func() { | |
var timer <-chan time.Time | |
timer = time.Tick(time.Duration(1) * time.Millisecond) | |
fth.MockFilterRunner.EXPECT().Ticker().Return(timer) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
fth.MockFilterRunner.EXPECT().Name().Return("timerinject").Times(12) | |
fth.MockFilterRunner.EXPECT().Inject(pack).Return(true).Times(11) | |
fth.MockHelper.EXPECT().PipelineConfig().Return(pConfig) | |
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(11) | |
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("exceeded InjectMessage count")) | |
config.ScriptFilename = "../lua/testsupport/timerinject.lua" | |
err := sbFilter.Init(config) | |
c.Assume(err, gs.IsNil) | |
go func() { | |
time.Sleep(time.Duration(250) * time.Millisecond) | |
close(inChan) | |
}() | |
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
}) | |
c.Specify("Preserves data", func() { | |
var timer <-chan time.Time | |
fth.MockFilterRunner.EXPECT().Ticker().Return(timer) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
config.ScriptFilename = "../lua/testsupport/serialize.lua" | |
config.PreserveData = true | |
sbFilter.SetName("serialize") | |
err := sbFilter.Init(config) | |
c.Assume(err, gs.IsNil) | |
close(inChan) | |
sbFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
_, err = os.Stat("sandbox_preservation/serialize.data") | |
c.Expect(err, gs.IsNil) | |
err = os.Remove("sandbox_preservation/serialize.data") | |
c.Expect(err, gs.IsNil) | |
}) | |
}) | |
c.Specify("A SandboxManagerFilter", func() { | |
pConfig.Globals.BaseDir = os.TempDir() | |
sbxMgrsDir := filepath.Join(pConfig.Globals.BaseDir, "sbxmgrs") | |
defer func() { | |
tmpErr := os.RemoveAll(sbxMgrsDir) | |
c.Expect(tmpErr, gs.IsNil) | |
}() | |
sbmFilter := new(SandboxManagerFilter) | |
sbmFilter.SetPipelineConfig(pConfig) | |
config := sbmFilter.ConfigStruct().(*SandboxManagerFilterConfig) | |
config.MaxFilters = 1 | |
msg := getTestMessage() | |
pack := pipeline.NewPipelinePack(pConfig.InputRecycleChan()) | |
pack.Message = msg | |
pack.Decoded = true | |
c.Specify("Control message in the past", func() { | |
sbmFilter.Init(config) | |
pack.Message.SetTimestamp(time.Now().UnixNano() - 5e9) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
fth.MockFilterRunner.EXPECT().Name().Return("SandboxManagerFilter") | |
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("Discarded control message: 5 seconds skew")) | |
inChan <- pack | |
close(inChan) | |
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
}) | |
c.Specify("Control message in the future", func() { | |
sbmFilter.Init(config) | |
pack.Message.SetTimestamp(time.Now().UnixNano() + 5.9e9) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
fth.MockFilterRunner.EXPECT().Name().Return("SandboxManagerFilter") | |
fth.MockFilterRunner.EXPECT().LogError(fmt.Errorf("Discarded control message: -5 seconds skew")) | |
inChan <- pack | |
close(inChan) | |
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
}) | |
c.Specify("Generates the right default working directory", func() { | |
sbmFilter.Init(config) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
name := "SandboxManagerFilter" | |
fth.MockFilterRunner.EXPECT().Name().Return(name) | |
close(inChan) | |
sbmFilter.Run(fth.MockFilterRunner, fth.MockHelper) | |
c.Expect(sbmFilter.workingDirectory, gs.Equals, sbxMgrsDir) | |
_, err := os.Stat(sbxMgrsDir) | |
c.Expect(err, gs.IsNil) | |
}) | |
c.Specify("Sanity check the default sandbox configuration limits", func() { | |
sbmFilter.Init(config) | |
c.Expect(sbmFilter.memoryLimit, gs.Equals, uint(8*1024*1024)) | |
c.Expect(sbmFilter.instructionLimit, gs.Equals, uint(1e6)) | |
c.Expect(sbmFilter.outputLimit, gs.Equals, uint(63*1024)) | |
}) | |
c.Specify("Sanity check the user specified sandbox configuration limits", func() { | |
config.MemoryLimit = 123456 | |
config.InstructionLimit = 4321 | |
config.OutputLimit = 8765 | |
sbmFilter.Init(config) | |
c.Expect(sbmFilter.memoryLimit, gs.Equals, config.MemoryLimit) | |
c.Expect(sbmFilter.instructionLimit, gs.Equals, config.InstructionLimit) | |
c.Expect(sbmFilter.outputLimit, gs.Equals, config.OutputLimit) | |
}) | |
c.Specify("Creates a SandboxFilter runner", func() { | |
sbxName := "SandboxFilter" | |
sbxMgrName := "SandboxManagerFilter" | |
code := ` | |
require("cjson") | |
function process_message() | |
inject_payload(cjson.encode({a = "b"})) | |
return 0 | |
end | |
` | |
cfg := ` | |
[%s] | |
type = "SandboxFilter" | |
message_matcher = "TRUE" | |
script_type = "lua" | |
` | |
cfg = fmt.Sprintf(cfg, sbxName) | |
msg.SetPayload(code) | |
f, err := message.NewField("config", cfg, "toml") | |
c.Assume(err, gs.IsNil) | |
msg.AddField(f) | |
fMatchChan := pConfig.Router().AddFilterMatcher() | |
errChan := make(chan error) | |
fth.MockFilterRunner.EXPECT().Name().Return(sbxMgrName) | |
fullSbxName := fmt.Sprintf("%s-%s", sbxMgrName, sbxName) | |
fth.MockHelper.EXPECT().Filter(fullSbxName).Return(nil, false) | |
fth.MockFilterRunner.EXPECT().LogMessage(fmt.Sprintf("Loading: %s", fullSbxName)) | |
sbmFilter.Init(config) | |
go func() { | |
err := sbmFilter.loadSandbox(fth.MockFilterRunner, fth.MockHelper, sbxMgrsDir, | |
msg) | |
errChan <- err | |
}() | |
fMatch := <-fMatchChan | |
c.Expect(fMatch.MatcherSpecification().String(), gs.Equals, "TRUE") | |
c.Expect(<-errChan, gs.IsNil) | |
go func() { | |
<-pConfig.Router().RemoveFilterMatcher() | |
}() | |
ok := pConfig.RemoveFilterRunner(fullSbxName) | |
c.Expect(ok, gs.IsTrue) | |
}) | |
}) | |
c.Specify("A Cpu Stats filter", func() { | |
filter := new(SandboxFilter) | |
filter.SetPipelineConfig(pConfig) | |
filter.name = "cpustats" | |
conf := filter.ConfigStruct().(*sandbox.SandboxConfig) | |
conf.ScriptFilename = "../lua/filters/cpustats.lua" | |
conf.ModuleDirectory = "../lua/modules" | |
conf.MemoryLimit = 1000000 | |
conf.Config = make(map[string]interface{}) | |
conf.Config["rows"] = int64(3) | |
conf.Config["seconds_per_row"] = int64(1) | |
timer := make(chan time.Time, 1) | |
errChan := make(chan error, 1) | |
retPackChan := make(chan *pipeline.PipelinePack, 5) | |
msg := getTestMessage() | |
fields := make([]*message.Field, 4) | |
fields[0], _ = message.NewField("1MinAvg", "0.08", "") | |
fields[1], _ = message.NewField("5MinAvg", "0.04", "") | |
fields[2], _ = message.NewField("15MinAvg", "0.02", "") | |
fields[3], _ = message.NewField("NumProcesses", "5", "") | |
msg.Fields = fields | |
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan()) | |
pack.Message = msg | |
pack.Decoded = true | |
fth.MockHelper.EXPECT().PipelinePack(uint(0)).Return(pack).Times(3) | |
fth.MockFilterRunner.EXPECT().Ticker().Return(timer) | |
fth.MockFilterRunner.EXPECT().InChan().Return(inChan) | |
fth.MockFilterRunner.EXPECT().Name().Return("cpustats").Times(3) | |
fth.MockFilterRunner.EXPECT().Inject(pack).Do(func(pack *pipeline.PipelinePack) { | |
retPackChan <- pack | |
}).Return(true).Times(3) | |
err := filter.Init(conf) | |
c.Assume(err, gs.IsNil) | |
go func() { | |
errChan <- filter.Run(fth.MockFilterRunner, fth.MockHelper) | |
}() | |
var p *pipeline.PipelinePack | |
for i := 0; i < 3; i++ { | |
pack := pipeline.NewPipelinePack(pConfig.InjectRecycleChan()) | |
pack.Message = msg | |
future := time.Second * time.Duration(i) | |
pack.Message.SetTimestamp(time.Now().Add(future).Unix()) | |
pack.Decoded = true | |
inChan <- pack | |
// Feed in a pack | |
// Begin processing | |
timer <- time.Now() | |
p = <-retPackChan | |
} | |
// Check the result of the filter's inject | |
fmt.Println("pl:", p.Message.GetPayload()) | |
close(inChan) | |
c.Expect(<-errChan, gs.IsNil) | |
close(errChan) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment