Skip to content

Instantly share code, notes, and snippets.

@kamal-github
Last active November 21, 2020 17:00
Show Gist options
  • Save kamal-github/615a466f2cfa74f02357ef99c8938619 to your computer and use it in GitHub Desktop.
Save kamal-github/615a466f2cfa74f02357ef99c8938619 to your computer and use it in GitHub Desktop.
Fan out and Fan in pattern in Go
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
// can be a input stream such as CSV/Network/File.
numbs := []int{1,2,3,4,5,6,7,8,9,10,11,12}
done := make(chan struct{})
defer close(done)
inpStream := generator(done, numbs)
mulStreams := make([]<-chan interface{}, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
// Fan out
mulStreams[i] = mul(done, inpStream, 5)
}
// Fan in
for v := range fanIn(done, mulStreams...) {
fmt.Println(v)
}
}
func generator(done <-chan struct{}, numbs []int) <-chan interface{} {
inpStream := make(chan interface{})
go func() {
defer close(inpStream)
for _, n := range numbs {
select {
case <-done:
return
case inpStream <- n:
}
}
}()
return inpStream
}
func mul(done <-chan struct{}, inpStream <-chan interface{}, num int) <-chan interface{}{
mulStream := make(chan interface{})
go func() {
defer close(mulStream)
for n := range inpStream {
select {
case <-done:
return
case mulStream <- n.(int) * num:
}
}
}()
return mulStream
}
func fanIn(done <-chan struct{}, streams ...<-chan interface{}) <-chan interface{} {
multiplexedStream := make(chan interface{})
// needed to close the multiplexedStream.
wg := new(sync.WaitGroup)
wg.Add(len(streams))
for _, s := range streams {
s := s
go func() {
defer wg.Done()
for v := range s {
select {
case <-done:
return
case multiplexedStream <- v:
}
}
}()
}
go func() {
defer close(multiplexedStream)
wg.Wait()
}()
return multiplexedStream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment