Last active
August 19, 2023 10:10
-
-
Save arschles/ee10bd14ad62a9b5b2efc051eab36b38 to your computer and use it in GitHub Desktop.
Fan-out and Fan-in with Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fans | |
// this code was inspired by https://talks.golang.org/2012/concurrency.slide#36 | |
// | |
// it uses the context package (which wasn't available when that presentation was made) instead of a timer to | |
// do timeouts | |
import ( | |
"context" | |
"time" | |
"sync" | |
) | |
func fn1(i int) int { | |
// probably I/O will happen here | |
return i + 1 | |
} | |
func fn2(i int) int { | |
// probably I/O will happen here | |
return i + 2 | |
} | |
func run() []int { | |
ctx := context.Background() | |
// return to the caller within 500 milliseconds, regardless of how long the functions take | |
ctx, done := context.WithTimeout(ctx, 500*time.Millisecond) | |
// when this function returns, stop the context. if you do this, <-ctx.Done() will be | |
// closed. that's important above if fn1 or fn2 receive on <-ctx.Done() | |
defer done() | |
// this is the channel that all functions will fan-in to | |
ch := make(chan int) | |
// this wait group is so that we can get notified if the functions all are done _before_ | |
// the context times out | |
var wg sync.WaitGroup | |
///// | |
// fan out | |
///// | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
// either | |
select { | |
case ch <- fn1(123): | |
// fn1 returned before the context timed out | |
case <-ctx.Done(): | |
// the context timed out before fn1 returned. if you want to keep going | |
// after the context times out, remove this case statement | |
return | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
select { | |
case ch <- fn2(456): | |
// fn2 returned before the context timed out | |
case <- ctx.Done(): | |
// the context timed out before fn2 returned. if you want to keep going | |
// after the context times out, remove this case statement | |
return | |
} | |
}() | |
// wait asynchronously for all the functions to be done and close the channel when they are | |
// | |
// this only matters if the functions return before the ctx times out | |
go func() { | |
wg.Wait() | |
close(ch) | |
}() | |
///// | |
// fan in | |
///// | |
ret := []int{} | |
for i := range ch { | |
select { | |
case i, ok <- ch: | |
if !ok { | |
// this means the channel is closed, so all the functions returned before | |
// the timeout. the i variable will be empty, but ret has everything we need so we | |
// can return it | |
return ret | |
} | |
ret = append(ret, i) | |
case <-ctx.Done(): | |
// time's up, break out of the loop | |
// | |
// instead of breaking here, you can start up a background goroutine | |
// to wait for the rest of the results and put them into a queue or something | |
break | |
} | |
} | |
return ret | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment