Created
April 30, 2017 09:44
-
-
Save chrismoos/0673df867b1f9e820c5f2bab1e6f1af5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "sync" | |
| "time" | |
| vegeta "github.com/chrismoos/vegeta/lib" | |
| ) | |
| type PipelineStageTargeter func(tgt *vegeta.Target, previousResult *vegeta.Result) error | |
| // This handler is invoked with the results from a stage of the pipeline. | |
| // In addition, once the stage is finished a *nil* result will be sent. | |
| type StageResultHandler func(result *vegeta.Result) | |
| type stageDefinition struct { | |
| targeter PipelineStageTargeter | |
| rate uint64 | |
| duration time.Duration | |
| results chan *vegeta.Result | |
| resultsHandler StageResultHandler | |
| } | |
| type AttackPipeline struct { | |
| waitGroup sync.WaitGroup | |
| stages []*stageDefinition | |
| } | |
| func NewAttackPipeline() *AttackPipeline { | |
| return &AttackPipeline{} | |
| } | |
| func (p *AttackPipeline) Add(tr PipelineStageTargeter, handler StageResultHandler, rate uint64, du time.Duration) { | |
| p.stages = append(p.stages, &stageDefinition{ | |
| targeter: tr, | |
| rate: rate, | |
| duration: du, | |
| // TODO - make configurable | |
| results: make(chan *vegeta.Result, 1000), | |
| resultsHandler: handler, | |
| }) | |
| } | |
| func StageToTargeter(stage *stageDefinition, rootStage bool) func(tgt *vegeta.Target) error { | |
| return func(tgt *vegeta.Target) error { | |
| var res *vegeta.Result | |
| if !rootStage { | |
| res = <-stage.results | |
| } | |
| return stage.targeter(tgt, res) | |
| } | |
| } | |
| func (p *AttackPipeline) Run(opts ...func(*vegeta.Attacker)) { | |
| p.waitGroup.Add(len(p.stages)) | |
| for x, stage := range p.stages { | |
| attacker := vegeta.NewAttacker(opts...) | |
| go func(idx int, stage *stageDefinition) { | |
| defer p.waitGroup.Done() | |
| rootStage := false | |
| if idx == 0 { | |
| rootStage = true | |
| } | |
| for res := range attacker.Attack(StageToTargeter(stage, rootStage), stage.rate, stage.duration) { | |
| // If there is another stage in the pipeline, send | |
| // the result to the next stage | |
| if idx+1 < len(p.stages) { | |
| p.stages[idx+1].results <- res | |
| } | |
| if stage.resultsHandler != nil { | |
| stage.resultsHandler(res) | |
| } | |
| } | |
| stage.resultsHandler(nil) | |
| }(x, stage) | |
| } | |
| p.waitGroup.Wait() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment