Created
September 17, 2013 11:12
-
-
Save bruth/6592945 to your computer and use it in GitHub Desktop.
Fan-out function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// fanOut takes an input channel and sends each output to one or more | |
// output channels. If a non-zero timeout is supplied, the | |
func fanOut(input chan []byte, outputs []chan []byte, exit chan bool, timeout time.Duration) { | |
if len(outputs) == 0 { | |
log.Println("zero outputs") | |
return | |
} | |
defer log.Println("cleaning up fanOut") | |
defer func() { | |
exit <- true | |
}() | |
// Used to signal goroutines to exit | |
signal := make(chan struct{}) | |
// Wait group for spawned routines used after exit is signaled | |
wg := sync.WaitGroup{} | |
// Default state of input channel | |
open := true | |
// Receive from input until exit is signaled | |
for open { | |
select { | |
case value, open := <-input: | |
if !open { | |
break | |
} | |
log.Println("input ->", value) | |
// Spawn goroutine for each output channel to prevent blocking | |
// other channels from receiving the message. | |
for i, output := range outputs { | |
log.Println("sending", value, "to output", i) | |
go func(output chan []byte, i int, value []byte) { | |
// Increment count, decrement on return | |
wg.Add(1) | |
defer wg.Done() | |
defer log.Println("closing for output", i) | |
select { | |
case output <- value: | |
log.Println("output", i, "received", value) | |
case <-signal: | |
} | |
}(output, i, value) | |
} | |
case <-signal: | |
open = false | |
default: | |
open = false | |
} | |
} | |
// The exit channel is expected to send a true value and wait | |
// until it receives a response, however if it is closed, | |
// immediately signal the goroutines. | |
if _, ok := <-exit; !ok { | |
log.Println("exit channel closed") | |
close(signal) | |
} else if timeout > 0 { | |
log.Println("timeout of", timeout, "started") | |
<-time.After(timeout) | |
close(signal) | |
} | |
// Wait until all routines are done and exit | |
log.Println("waiting for goroutines to finish") | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment