Skip to content

Instantly share code, notes, and snippets.

@bruth
Created September 17, 2013 11:12
Show Gist options
  • Save bruth/6592945 to your computer and use it in GitHub Desktop.
Save bruth/6592945 to your computer and use it in GitHub Desktop.
Fan-out function
// fanOut takes an input channel and sends each output to one or more
// output channels. If a non-zero timeout is supplied, the
func fanOut(input chan []byte, outputs []chan []byte, exit chan bool, timeout time.Duration) {
if len(outputs) == 0 {
log.Println("zero outputs")
return
}
defer log.Println("cleaning up fanOut")
defer func() {
exit <- true
}()
// Used to signal goroutines to exit
signal := make(chan struct{})
// Wait group for spawned routines used after exit is signaled
wg := sync.WaitGroup{}
// Default state of input channel
open := true
// Receive from input until exit is signaled
for open {
select {
case value, open := <-input:
if !open {
break
}
log.Println("input ->", value)
// Spawn goroutine for each output channel to prevent blocking
// other channels from receiving the message.
for i, output := range outputs {
log.Println("sending", value, "to output", i)
go func(output chan []byte, i int, value []byte) {
// Increment count, decrement on return
wg.Add(1)
defer wg.Done()
defer log.Println("closing for output", i)
select {
case output <- value:
log.Println("output", i, "received", value)
case <-signal:
}
}(output, i, value)
}
case <-signal:
open = false
default:
open = false
}
}
// The exit channel is expected to send a true value and wait
// until it receives a response, however if it is closed,
// immediately signal the goroutines.
if _, ok := <-exit; !ok {
log.Println("exit channel closed")
close(signal)
} else if timeout > 0 {
log.Println("timeout of", timeout, "started")
<-time.After(timeout)
close(signal)
}
// Wait until all routines are done and exit
log.Println("waiting for goroutines to finish")
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment