This document demonstrates a basic pipeline in Go and talks about a risk in in implementing them. Keep in mind that:
- Using FizzBuzz here is contrived, but quite a nice way to demonstrate the concept. Obviously there is no advantage to running the basic workload of FizzBuzz in a Goroutine, but if the task involved heavy computation or long lived I/O, it starts to make a lot more sense.
- The problem discussed only applies to Goroutines that actually need to have a way to stop. If they're ran for the entire duration of a program, then this doesn't apply.
The patterns are re-used from this official Go blog post on pipelines, with some additional commentary.
Here's a very basic demonstration of FizzBuzz using a pipeline pattern in Go (which amounts to a Goroutine and a channel):
package main
import "fmt"
type Result struct {
num int
display string
}
func fizzbuzz(out chan Result) {
for num := 1; ; num++ {
switch {
case num%3 == 0 && num%5 == 0:
out <- Result{num, "FizzBuzz"}
case num%3 == 0:
out <- Result{num, "Fizz"}
case num%5 == 0:
out <- Result{num, "Buzz"}
default:
out <- Result{num, fmt.Sprintf("%d", num)}
}
}
}
func main() {
out := make(chan Result)
go fizzbuzz(out)
for res := range out {
if res.num >= 100 {
break
}
fmt.Println(res.display)
}
}
We create a channel to receive results on (out
), then fire up a Goroutine
(fizzbuzz
), and start iterating through the results. When we've had enough
(i.e. hit our upper bound of 100), we break the loop and exit.
This works great:
...
88
89
FizzBuzz
91
92
Fizz
94
Buzz
Fizz
97
98
Fizz
The above works fine if we're just trying to demonstrate it with a simple run of our program, but we leak our Goroutine because it's never shut down. If we want to make our FizzBuzz pipeline re-usable within the lifetime of the program, we need to introduce some way to stop it so that it's resources can be reclaimed.
Go's philosphy when it comes to Goroutines is that they should end their own
execution, rather than being ended by a different context, so we can introduce
the idea of a stop channel (done
) to end the pipeline's execution:
package main
import (
"fmt"
"time"
)
type Result struct {
num int
display string
}
func fizzbuzz(out chan Result, done chan struct{}) {
for num := 1; ; num++ {
select {
case <-done:
break
default:
}
switch {
case num%3 == 0 && num%5 == 0:
out <- Result{num, "FizzBuzz"}
case num%3 == 0:
out <- Result{num, "Fizz"}
case num%5 == 0:
out <- Result{num, "Buzz"}
default:
out <- Result{num, fmt.Sprintf("%d", num)}
}
}
fmt.Println("Left FizzBuzz.")
}
func main() {
out := make(chan Result)
done := make(chan struct{})
go fizzbuzz(out, done)
for res := range out {
if res.num >= 100 {
break
}
fmt.Println(res.display)
}
close(done)
// Sleep to give fizzbuzz a chance to print and return.
time.Sleep(1 * time.Second)
}
We use Go's close
as a signal that the fizzbuzz
Goroutine should end
execution by taking advantage of the fact that a Goroutine waiting on a channel
that closes immediately falls through with its type's zero value. close
is a
superior approach compared to injecting a "done" value into the channel because
it signals all waiting Goroutines no matter how many there happen to be.
Experienced Go users will notice the problem above immediately: we've
introduced a race condition with the possibility of a leaked Goroutine.
Although there's a chance that the program will end as expected, more usually,
the pipeline Goroutine will continue after its injected a result into out
,
loop around, go right passed its check on done
(which hasn't been closed
yet), and block on sending the next result to out
. Every time we use
fizzbuzz
we'll leak a new Goroutine, until eventually our program runs out of
memory and crashes.
The solution to the problem above is the use of select
every time we perform
a potentially blocking operation (for example, injecting a value into a
channel). Here's a working version of the same program that doesn't leak a
Goroutine:
package main
import (
"fmt"
"time"
)
type Result struct {
num int
display string
}
func fizzbuzz(out chan Result, done chan struct{}) {
outerLoop:
for num := 1; ; num++ {
switch {
case num%3 == 0 && num%5 == 0:
select {
case <-done:
break outerLoop
case out <- Result{num, "FizzBuzz"}:
}
case num%3 == 0:
select {
case <-done:
break outerLoop
case out <- Result{num, "Fizz"}:
}
case num%5 == 0:
select {
case <-done:
break outerLoop
case out <- Result{num, "Buzz"}:
}
default:
select {
case <-done:
break outerLoop
case out <- Result{num, fmt.Sprintf("%d", num)}:
}
}
}
fmt.Println("Left FizzBuzz.")
}
func main() {
out := make(chan Result)
done := make(chan struct{})
go fizzbuzz(out, done)
for res := range out {
if res.num >= 100 {
break
}
fmt.Println(res.display)
}
close(done)
// Sleep to give fizzbuzz a chance to print and return.
time.Sleep(1 * time.Second)
}
The key takeaway here is that we should check done
on every potentially
blocking operation unless we know for sure that a leak on the operation is
impossible based on how the caller runs. While this does work, it also
introduces some risk of error in that it becomes fairly easy to introduce a
Goroutine leak without perfect understanding of the code (say when its modified
by a developer who isn't the original author). Good commenting and general
experience with the pipeline pattern can help mitigate this risk, but
fundamentally the language doesn't offer any facilities to ensure its safety.
In this case, we can easily simplify the code significantly, but keep in mind that given a program that's less trivial in that it involves more a complex control flow and different types of messages being passed into the channel, this won't be quite as easy.
(Here's a better-written version of the above:)
package main
import (
"fmt"
"time"
)
type Result struct {
num int
display string
}
func fizzbuzz(out chan Result, done chan struct{}) {
outerLoop:
for num := 1; ; num++ {
var res Result
switch {
case num%3 == 0 && num%5 == 0:
res = Result{num, "FizzBuzz"}
case num%3 == 0:
res = Result{num, "Fizz"}
case num%5 == 0:
res = Result{num, "Buzz"}
default:
res = Result{num, fmt.Sprintf("%d", num)}
}
select {
case <-done:
break outerLoop
case out <- res:
}
}
fmt.Println("Left FizzBuzz.")
}
func main() {
out := make(chan Result)
done := make(chan struct{})
go fizzbuzz(out, done)
for res := range out {
if res.num >= 100 {
break
}
fmt.Println(res.display)
}
close(done)
// Sleep to give fizzbuzz a chance to print and return.
time.Sleep(1 * time.Second)
}