Last active
December 23, 2015 01:39
-
-
Save bruth/6561332 to your computer and use it in GitHub Desktop.
Go fan-in function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"sync" | |
"time" | |
) | |
// fanIn takes zero or more channels and merges the received data to a | |
// single output channel. For efficiency, the output channel should be | |
// buffered to the number of inputs to prevent goroutines blocking each | |
// other. | |
func fanIn(inputs []chan []byte, output chan []byte, exit chan bool, timeout time.Duration) { | |
if len(inputs) == 0 { | |
log.Println("zero inputs") | |
return | |
} | |
defer log.Println("cleaning up fanIn") | |
// Always signal the exit | |
defer func() { | |
exit <- true | |
}() | |
// Used to signal goroutines to exit | |
signal := make(chan struct{}) | |
// Wait group for spawned routines used after exit is signaled | |
wg := sync.WaitGroup{} | |
wg.Add(len(inputs)) | |
// Spawn goroutines for each input channel | |
for i, input := range inputs { | |
log.Println("spawning input", i) | |
// Spawn go routine for each input | |
go func(input chan []byte, i int) { | |
defer log.Println("closing input", i) | |
defer wg.Done() | |
open := true | |
// for-select idiom to constantly receive off the input | |
// channel until it is closed on it has been signaled | |
// to exit | |
for open { | |
select { | |
case value, open := <-input: | |
// Input is closed, break | |
if !open { | |
log.Println("(closed) input", i) | |
break | |
} | |
output <- value | |
log.Printf("input %d -> %d\n", i, value) | |
case <-signal: | |
log.Println("(signaled) input", i) | |
open = false | |
default: | |
open = false | |
} | |
} | |
}(input, i) | |
} | |
// The exit channel is expected to send a true value and wait | |
// until it receives a response, however if it is closed, | |
// immediately signal the goroutines. | |
if _, ok := <-exit; !ok { | |
log.Println("exit channel closed") | |
close(signal) | |
} else if timeout > 0 { | |
log.Println("timeout of", timeout, "started") | |
<-time.After(timeout) | |
close(signal) | |
} | |
// Wait until all routines are done and exit | |
log.Println("waiting for goroutines to finish") | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment