Skip to content

Instantly share code, notes, and snippets.

@jayme-github
Created August 8, 2017 19:48
Show Gist options
  • Select an option

  • Save jayme-github/e9bc38ecf7cf3f3b453455b1dac862ed to your computer and use it in GitHub Desktop.

Select an option

Save jayme-github/e9bc38ecf7cf3f3b453455b1dac862ed to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"math/rand"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
var (
concurrency = flag.Int("w", 4, "")
numJobs = flag.Int("j", 10, "")
wg sync.WaitGroup
)
func randInt(min, max int) int {
rand.Seed(time.Now().UTC().UnixNano())
return rand.Intn(max-min) + min
}
func genJobs() <-chan interface{} {
out := make(chan interface{})
go func() {
// Number of jobs is usually unknown
log.Infof("Adding %d jobs", *numJobs)
for i := 0; i < *numJobs; i++ {
log.Infoln("Sending job:", i)
// Blocks until free workers
out <- i
}
// All jobs have been send to workers
close(out)
}()
return out
}
func worker(id int, jobs <-chan interface{}) <-chan interface{} {
clog := log.WithField("worker", id)
clog.Debugln("Starting")
out := make(chan interface{})
go func() {
for job := range jobs {
clog.Debugf("Processing job: %v", job)
time.Sleep(1 * time.Second)
clog.Debugf("Sending result for job: %v", job)
out <- job
clog.Debugf("Done job: %v", job)
}
close(out)
}()
clog.Debugln("Exiting")
return out
}
func merge(cs ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
out := make(chan interface{})
// Start output routing for each channel in cs
output := func(c <-chan interface{}) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are done.
// This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
log.SetFormatter(&log.TextFormatter{
DisableColors: true,
FullTimestamp: true,
})
log.SetLevel(log.DebugLevel)
flag.Parse()
jobs := genJobs()
log.Debugf("Booting %d workers", *concurrency)
var cw []<-chan interface{}
for i := 0; i < *concurrency; i++ {
cw = append(cw, worker(i, jobs))
}
results := merge(cw...)
// Read all results
for r := range results {
log.Infof("Got result: %v", r)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment