Created
February 12, 2018 22:39
-
-
Save xeoncross/3e0328137019b14373ee26701a23ed81 to your computer and use it in GitHub Desktop.
Fastest way to merge multiple channels in golang.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"sync/atomic" | |
) | |
// Fill Channel with int values | |
func fillChan(number int) <-chan int { | |
c := make(chan int) | |
go func() { | |
for i := 0; i < number; i++ { | |
c <- i | |
} | |
close(c) | |
}() | |
return c | |
} | |
// Create multiple channels and fill them | |
func createChannels(number, fill int) (chans []<-chan int) { | |
chans = make([]<-chan int, number) | |
for i := 0; i < number; i++ { | |
chans[i] = fillChan(fill) | |
} | |
return | |
} | |
/* Fail | |
func mergeTwo(a, b <-chan int) (c chan int) { | |
c = make(chan int) | |
go func() { | |
loop: | |
for { | |
select { | |
case c <- <-a: | |
// | |
case c <- <-b: | |
// | |
default: | |
break loop | |
} | |
} | |
close(c) | |
}() | |
return c | |
} | |
func mergeRec(chans ...<-chan int) <-chan int { | |
switch len(chans) { | |
case 0: | |
c := make(chan int) | |
close(c) | |
return c | |
case 1: | |
return chans[0] | |
default: | |
m := len(chans) / 2 | |
return mergeTwo( | |
mergeRec(chans[:m]...), | |
mergeRec(chans[m:]...)) | |
} | |
} | |
*/ | |
func mergeWait(cs ...<-chan int) <-chan int { | |
out := make(chan int) | |
var wg sync.WaitGroup | |
wg.Add(len(cs)) | |
for _, c := range cs { | |
go func(c <-chan int) { | |
for v := range c { | |
out <- v | |
} | |
wg.Done() | |
}(c) | |
} | |
go func() { | |
wg.Wait() | |
close(out) | |
}() | |
return out | |
} | |
func mergeAtomic(cs ...<-chan int) <-chan int { | |
out := make(chan int) | |
var i int32 | |
atomic.StoreInt32(&i, int32(len(cs))) | |
for _, c := range cs { | |
go func(c <-chan int) { | |
for v := range c { | |
out <- v | |
} | |
if atomic.AddInt32(&i, -1) == 0 { | |
close(out) | |
} | |
}(c) | |
} | |
return out | |
} | |
func main() { | |
a := fillChan(2) | |
b := fillChan(2) | |
c := fillChan(2) | |
d := mergeWait(a, b, c) | |
for v := range d { | |
fmt.Println(v) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"testing" | |
) | |
func BenchmarkMerge(b *testing.B) { | |
merges := []struct { | |
name string | |
fun func(...<-chan int) <-chan int | |
}{ | |
{"goroutines", mergeWait}, | |
{"atomic", mergeAtomic}, | |
// {"recursion", mergeRec}, | |
} | |
for _, merge := range merges { | |
counter := 0 | |
b.Run(merge.name, func(b *testing.B) { | |
for i := 0; i < b.N; i++ { | |
chans := createChannels(100, 100) | |
c := merge.fun(chans...) | |
counter = 0 // Reset each run | |
for range c { | |
counter++ | |
} | |
} | |
}) | |
fmt.Printf("%d results %s\n", counter, merge.name) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run
Based off: