Created
February 27, 2014 23:51
-
-
Save dctrwatson/9262287 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/***** BEGIN LICENSE BLOCK ***** | |
# This Source Code Form is subject to the terms of the Mozilla Public | |
# License, v. 2.0. If a copy of the MPL was not distributed with this file, | |
# You can obtain one at http://mozilla.org/MPL/2.0/. | |
# | |
# The Initial Developer of the Original Code is the Mozilla Foundation. | |
# Portions created by the Initial Developer are Copyright (C) 2013 | |
# the Initial Developer. All Rights Reserved. | |
# | |
# Contributor(s): | |
# Rob Miller ([email protected]) | |
# | |
# ***** END LICENSE BLOCK *****/ | |
package pipeline | |
import ( | |
"code.google.com/p/gomock/gomock" | |
"github.com/mozilla-services/heka/message" | |
ts "github.com/mozilla-services/heka/pipeline/testsupport" | |
gs "github.com/rafrombrc/gospec/src/gospec" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
) | |
type InputTestHelper struct { | |
Msg *message.Message | |
Pack *PipelinePack | |
AddrStr string | |
ResolvedAddrStr string | |
MockHelper *MockPluginHelper | |
MockInputRunner *MockInputRunner | |
Decoder DecoderRunner | |
PackSupply chan *PipelinePack | |
DecodeChan chan *PipelinePack | |
} | |
func StatAccumInputSpec(c gs.Context) { | |
t := &ts.SimpleT{} | |
ctrl := gomock.NewController(t) | |
defer ctrl.Finish() | |
pConfig := NewPipelineConfig(nil) | |
ith := new(InputTestHelper) | |
ith.MockHelper = NewMockPluginHelper(ctrl) | |
ith.MockInputRunner = NewMockInputRunner(ctrl) | |
ith.Pack = NewPipelinePack(pConfig.inputRecycleChan) | |
ith.PackSupply = make(chan *PipelinePack, 1) | |
ith.PackSupply <- ith.Pack | |
c.Specify("A StatAccumInput using normal namespaces", func() { | |
ith.MockHelper.EXPECT().PipelineConfig().Return(pConfig) | |
ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply) | |
ith.MockInputRunner.EXPECT().Inject(ith.Pack) | |
ith.MockInputRunner.EXPECT().Ticker() | |
statAccumInput := StatAccumInput{} | |
config := statAccumInput.ConfigStruct().(*StatAccumInputConfig) | |
config.EmitInFields = true | |
config.EmitInPayload = false | |
err := statAccumInput.Init(config) | |
c.Expect(err, gs.IsNil) | |
var wg sync.WaitGroup | |
prepareSendingStats := func() { | |
wg.Add(1) | |
go func() { | |
err := statAccumInput.Run(ith.MockInputRunner, ith.MockHelper) | |
wg.Done() | |
c.Expect(err, gs.IsNil) | |
}() | |
} | |
finalizeSendingStats := func() *message.Message { | |
close(statAccumInput.statChan) | |
wg.Wait() | |
return ith.Pack.Message | |
} | |
sendTimer := func(key string, vals ...int) { | |
for _, v := range vals { | |
statAccumInput.statChan <- Stat{key, strconv.Itoa(v), "ms", float32(1)} | |
} | |
} | |
sendCounter := func(key string, vals ...int) { | |
for _, v := range vals { | |
statAccumInput.statChan <- Stat{key, strconv.Itoa(v), "c", float32(1)} | |
} | |
} | |
sendGauge := func(key string, vals ...int) { | |
for _, v := range vals { | |
statAccumInput.statChan <- Stat{key, strconv.Itoa(v), "g", float32(1)} | |
} | |
} | |
validateValueAtKey := func(msg *message.Message, key string, value interface{}) { | |
fieldValue, ok := msg.GetFieldValue(key) | |
c.Expect(ok, gs.IsTrue) | |
c.Expect(fieldValue, gs.Equals, value) | |
} | |
c.Specify("emits timer with correct prefixes", func() { | |
prepareSendingStats() | |
sendTimer("sample.timer", 10, 10, 20, 20) | |
sendTimer("sample2.timer", 10, 20) | |
msg := finalizeSendingStats() | |
validateValueAtKey(msg, "stats.timers.sample.timer.count", int64(4)) | |
validateValueAtKey(msg, "stats.timers.sample.timer.mean", 15.0) | |
validateValueAtKey(msg, "stats.timers.sample.timer.lower", 10.0) | |
validateValueAtKey(msg, "stats.timers.sample2.timer.count", int64(2)) | |
validateValueAtKey(msg, "stats.timers.sample2.timer.mean", 15.0) | |
validateValueAtKey(msg, "stats.timers.sample2.timer.lower", 10.0) | |
validateValueAtKey(msg, "stats.statsd.numStats", int64(2)) | |
}) | |
c.Specify("emits counters with correct prefixes", func() { | |
prepareSendingStats() | |
sendCounter("sample.cnt", 1, 2, 3, 4, 5) | |
sendCounter("sample2.cnt", 159, 951) | |
msg := finalizeSendingStats() | |
validateValueAtKey(msg, "stats.counters.sample.cnt.count", int64(15)) | |
validateValueAtKey(msg, "stats.counters.sample.cnt.rate", 1.5) | |
validateValueAtKey(msg, "stats.counters.sample2.cnt.count", int64(1110)) | |
validateValueAtKey(msg, "stats.counters.sample2.cnt.rate", 1110.0/float64(config.TickerInterval)) | |
}) | |
c.Specify("emits gauge with correct prefixes", func() { | |
prepareSendingStats() | |
sendGauge("sample.gauge", 1, 2) | |
sendGauge("sample2.gauge", 1, 2, 3, 4, 5) | |
msg := finalizeSendingStats() | |
validateValueAtKey(msg, "stats.gauges.sample.gauge", int64(2)) | |
validateValueAtKey(msg, "stats.gauges.sample2.gauge", int64(5)) | |
}) | |
c.Specify("emits correct statsd.numStats count", func() { | |
prepareSendingStats() | |
sendGauge("sample.gauge", 1, 2) | |
sendGauge("sample2.gauge", 1, 2) | |
sendCounter("sample.cnt", 1, 2, 3, 4, 5) | |
sendCounter("sample2.cnt", 159, 951) | |
sendTimer("sample.timer", 10, 10, 20, 20) | |
sendTimer("sample2.timer", 10, 20) | |
msg := finalizeSendingStats() | |
validateValueAtKey(msg, "stats.statsd.numStats", int64(6)) | |
}) | |
c.Specify("emits proper idle stats", func() { | |
prepareSendingStats() | |
sendGauge("sample.gauge", 1, 2) | |
sendCounter("sample.cnt", 1, 2, 3, 4, 5) | |
sendTimer("sample.timer", 10, 10, 20, 20) | |
statAccumInput.Flush() | |
ith.Pack.Recycle() | |
ith.PackSupply <- ith.Pack | |
msg := finalizeSendingStats() | |
validateValueAtKey(msg, "stats.gauges.sample.gauge", int64(2)) | |
validateValueAtKey(msg, "stats.counters.sample.cnt.count", 0) | |
validateValueAtKey(msg, "stats.timers.sample.timer.count", 0) | |
validateValueAtKey(msg, "stats.statsd.numStats", int64(3)) | |
}) | |
}) | |
c.Specify("A StatAccumInput using Legacy namespaces", func() { | |
statAccumInput := StatAccumInput{} | |
config := statAccumInput.ConfigStruct().(*StatAccumInputConfig) | |
config.LegacyNamespaces = true | |
tickChan := make(chan time.Time) | |
c.Specify("must emit data in payload and/or message fields", func() { | |
config.EmitInPayload = false | |
err := statAccumInput.Init(config) | |
c.Expect(err, gs.Not(gs.IsNil)) | |
expected := "One of either `EmitInPayload` or `EmitInFields` must be set to true." | |
c.Expect(err.Error(), gs.Equals, expected) | |
}) | |
c.Specify("that actually emits a message", func() { | |
statName := "sample.stat" | |
statVal := int64(303) | |
testStat := Stat{statName, strconv.Itoa(int(statVal)), "c", float32(1)} | |
validateMsgFields := func(msg *message.Message) { | |
c.Expect(len(msg.Fields), gs.Equals, 4) | |
// timestamp | |
_, ok := msg.GetFieldValue("timestamp") | |
c.Expect(ok, gs.IsTrue) | |
var tmp interface{} | |
var intTmp int64 | |
// stats.sample.stat | |
tmp, ok = msg.GetFieldValue("stats." + statName) | |
c.Expect(ok, gs.IsTrue) | |
intTmp, ok = tmp.(int64) | |
c.Expect(ok, gs.IsTrue) | |
c.Expect(intTmp, gs.Equals, int64(30)) | |
// stats_counts.sample.stat | |
tmp, ok = msg.GetFieldValue("stats_counts." + statName) | |
c.Expect(ok, gs.IsTrue) | |
intTmp, ok = tmp.(int64) | |
c.Expect(ok, gs.IsTrue) | |
c.Expect(intTmp, gs.Equals, statVal) | |
// statsd.numStats | |
tmp, ok = msg.GetFieldValue("statsd.numStats") | |
c.Expect(ok, gs.IsTrue) | |
intTmp, ok = tmp.(int64) | |
c.Expect(ok, gs.IsTrue) | |
c.Expect(intTmp, gs.Equals, int64(1)) | |
} | |
validateMsgPayload := func(msg *message.Message) { | |
lines := strings.Split(msg.GetPayload(), "\n") | |
c.Expect(len(lines), gs.Equals, 4) | |
c.Expect(lines[3], gs.Equals, "") | |
var timestamp string | |
for i := 0; i < 3; i++ { | |
line := strings.Split(lines[i], " ") | |
c.Expect(len(line), gs.Equals, 3) | |
switch i { | |
case 0: | |
c.Expect(line[0], gs.Equals, "stats."+statName) | |
c.Expect(line[1], gs.Equals, "30.300000") | |
timestamp = line[2] | |
case 1: | |
c.Expect(line[0], gs.Equals, "stats_counts."+statName) | |
c.Expect(line[1], gs.Equals, strconv.Itoa(int(statVal))) | |
c.Expect(line[2], gs.Equals, timestamp) | |
case 2: | |
c.Expect(line[0], gs.Equals, "statsd.numStats") | |
c.Expect(line[1], gs.Equals, "1") | |
c.Expect(line[2], gs.Equals, timestamp) | |
} | |
} | |
expected := strings.Join(lines, "\n") | |
c.Expect(msg.GetPayload(), gs.Equals, expected) | |
} | |
ith.MockHelper.EXPECT().PipelineConfig().Return(pConfig) | |
ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply) | |
ith.MockInputRunner.EXPECT().Inject(ith.Pack) | |
ith.MockInputRunner.EXPECT().Ticker() | |
var wg sync.WaitGroup | |
startAndSwapTickChan := func() { | |
wg.Add(1) | |
go func() { | |
err := statAccumInput.Run(ith.MockInputRunner, ith.MockHelper) | |
wg.Done() | |
c.Expect(err, gs.IsNil) | |
}() | |
time.Sleep(50) // Kludgey wait for tickChan to be set so we can replace. | |
statAccumInput.tickChan = tickChan | |
} | |
c.Specify("emits data in payload by default", func() { | |
err := statAccumInput.Init(config) | |
c.Assume(err, gs.IsNil) | |
startAndSwapTickChan() | |
statAccumInput.statChan <- testStat | |
close(statAccumInput.statChan) | |
wg.Wait() | |
validateMsgPayload(ith.Pack.Message) | |
}) | |
c.Specify("emits data in fields when specified", func() { | |
config.EmitInFields = true | |
err := statAccumInput.Init(config) | |
c.Assume(err, gs.IsNil) | |
startAndSwapTickChan() | |
statAccumInput.statChan <- testStat | |
close(statAccumInput.statChan) | |
wg.Wait() | |
validateMsgFields(ith.Pack.Message) | |
validateMsgPayload(ith.Pack.Message) | |
}) | |
c.Specify("omits data in payload when specified", func() { | |
config.EmitInPayload = false | |
config.EmitInFields = true | |
err := statAccumInput.Init(config) | |
c.Assume(err, gs.IsNil) | |
startAndSwapTickChan() | |
statAccumInput.statChan <- testStat | |
close(statAccumInput.statChan) | |
wg.Wait() | |
validateMsgFields(ith.Pack.Message) | |
c.Expect(ith.Pack.Message.GetPayload(), gs.Equals, "") | |
}) | |
c.Specify("honors time ticker to flush", func() { | |
err := statAccumInput.Init(config) | |
c.Assume(err, gs.IsNil) | |
startAndSwapTickChan() | |
statAccumInput.statChan <- testStat | |
tickChan <- time.Now() | |
validateMsgPayload(ith.Pack.Message) | |
ith.PackSupply <- ith.Pack | |
ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply) | |
ith.MockInputRunner.EXPECT().Inject(ith.Pack) | |
close(statAccumInput.statChan) | |
wg.Wait() | |
}) | |
c.Specify("correctly processes timers", func() { | |
sendTimer := func(vals ...int) { | |
for _, v := range vals { | |
statAccumInput.statChan <- Stat{"sample.timer", strconv.Itoa(int(v)), "ms", float32(1)} | |
} | |
} | |
config.EmitInFields = true | |
err := statAccumInput.Init(config) | |
c.Assume(err, gs.IsNil) | |
startAndSwapTickChan() | |
sendTimer(220, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) | |
close(statAccumInput.statChan) | |
wg.Wait() | |
msg := ith.Pack.Message | |
getVal := func(token string) float64 { | |
tmp, ok := msg.GetFieldValue("stats.timers.sample.timer." + token) | |
c.Expect(ok, gs.IsTrue) | |
val, ok := tmp.(float64) | |
c.Expect(ok, gs.IsTrue) | |
return val | |
} | |
c.Expect(getVal("upper"), gs.Equals, 220.0) | |
c.Expect(getVal("lower"), gs.Equals, 10.0) | |
c.Expect(getVal("mean"), gs.Equals, 70.0) | |
c.Expect(getVal("upper_90"), gs.Equals, 100.0) | |
c.Expect(getVal("mean_90"), gs.Equals, 55.0) | |
tmp, ok := msg.GetFieldValue("stats.timers.sample.timer.count") | |
c.Expect(ok, gs.IsTrue) | |
intTmp, ok := tmp.(int64) | |
c.Expect(ok, gs.IsTrue) | |
c.Expect(intTmp, gs.Equals, int64(11)) | |
}) | |
}) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment