Skip to content

Instantly share code, notes, and snippets.

@chrismoos
Created April 30, 2017 09:44
Show Gist options
  • Select an option

  • Save chrismoos/0673df867b1f9e820c5f2bab1e6f1af5 to your computer and use it in GitHub Desktop.

Select an option

Save chrismoos/0673df867b1f9e820c5f2bab1e6f1af5 to your computer and use it in GitHub Desktop.
package main
import (
"sync"
"time"
vegeta "github.com/chrismoos/vegeta/lib"
)
type PipelineStageTargeter func(tgt *vegeta.Target, previousResult *vegeta.Result) error
// This handler is invoked with the results from a stage of the pipeline.
// In addition, once the stage is finished a *nil* result will be sent.
type StageResultHandler func(result *vegeta.Result)
type stageDefinition struct {
targeter PipelineStageTargeter
rate uint64
duration time.Duration
results chan *vegeta.Result
resultsHandler StageResultHandler
}
type AttackPipeline struct {
waitGroup sync.WaitGroup
stages []*stageDefinition
}
func NewAttackPipeline() *AttackPipeline {
return &AttackPipeline{}
}
func (p *AttackPipeline) Add(tr PipelineStageTargeter, handler StageResultHandler, rate uint64, du time.Duration) {
p.stages = append(p.stages, &stageDefinition{
targeter: tr,
rate: rate,
duration: du,
// TODO - make configurable
results: make(chan *vegeta.Result, 1000),
resultsHandler: handler,
})
}
func StageToTargeter(stage *stageDefinition, rootStage bool) func(tgt *vegeta.Target) error {
return func(tgt *vegeta.Target) error {
var res *vegeta.Result
if !rootStage {
res = <-stage.results
}
return stage.targeter(tgt, res)
}
}
func (p *AttackPipeline) Run(opts ...func(*vegeta.Attacker)) {
p.waitGroup.Add(len(p.stages))
for x, stage := range p.stages {
attacker := vegeta.NewAttacker(opts...)
go func(idx int, stage *stageDefinition) {
defer p.waitGroup.Done()
rootStage := false
if idx == 0 {
rootStage = true
}
for res := range attacker.Attack(StageToTargeter(stage, rootStage), stage.rate, stage.duration) {
// If there is another stage in the pipeline, send
// the result to the next stage
if idx+1 < len(p.stages) {
p.stages[idx+1].results <- res
}
if stage.resultsHandler != nil {
stage.resultsHandler(res)
}
}
stage.resultsHandler(nil)
}(x, stage)
}
p.waitGroup.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment