Pipelines in Go are really neat. They're composable functions that allow you to chain concurrent events together, safely.
If you have the need to call API after API after API, witl the same data, maybe pipelines could help. Imagine you have the following:
func create(item Object) Object {
// do things, call APIs
return item
}
func update(item Object) Object {
// do things, call APIs
return item
}
func save(item Object) Object {
// do things, call APIs
return item
}
save(update(create(x)))
you can change this to a pipeline to do more calls concurrently, and still allow for the compositional nature.
As an example, you can use these below concurrent functions and compose them together to form a simple pipeline. The below functions all act at the same time, but only one instance of each function runs at a time. This makes it only a little faster.
package main
import (
"fmt"
)
func main() {
done := make(chan struct{})
defer close(done)
out := mod(2)(done, timesThree(done, in(done, 1, 2, 3, 4, 5, 6, 7)))
for i := range out {
fmt.Println(i)
}
}
// a common signature
type pipelineFn func(done <-chan struct{}, in <-chan int) <-chan int
func in(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, v := range nums {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}
func timesThree(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := range in {
select {
case <-done:
return
case out <- i * 3: // nothing, job is done
}
}
}()
return out
}
func mod(m int) pipelineFn {
return func(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := range in {
select {
case <-done:
return
default:
if i%m == 0 {
out <- i
}
}
}
}()
return out
}
}
https://play.golang.org/p/sBZUB--9sB.
Each one returns a common signature. They take a done
channel to cancel their code when the process needs to end, an in
channel to take a stream of data, and a return type of the same type as in
to send data back out to:
type pipelineFn func(done <-chan struct{}, in <-chan int) <-chan int
This means, to adhere to this, if you need more arguments (like the func mod
), you need a closure that returns that type. Composing these together works. They all wait to read from in, they all wait to read from done. If done is called, they all quit gracefully. Else, they process something and send it down the pipeline.
Let's fix the fact this only allows a function to operate on an item one at a time. For now, pretend you know the length of things being passed beforehand. This allows a simpler solution of using a waitgroup. For example, a slice of objects to process and change/update/upload to a datastore/etc. If you don't know the amount, typically you just use something like a fan-out and use context to keep track.
The fix:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
done := make(chan struct{})
items := []Object{Object{ID: 1}, Object{ID: 2}, Object{ID: 3}}
l := len(items)
pipeline := storeItem(done, l, makeName(done, l, pipe(done, items...)))
for item := range pipeline {
fmt.Println(item)
}
}
// some data structure
type Object struct {
ID int
Name string
Err error
}
// fake logic
func someLogic(o Object) error {
return nil
}
func pipe(done <-chan struct{}, os ...Object) <-chan Object {
out := make(chan Object)
go func() {
for _, v := range os {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}
func storeItem(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
out := make(chan Object)
var wg sync.WaitGroup
wg.Add(ct)
go func() {
for i := range in {
select {
case <-done:
return
default:
}
if i.Err != nil {
out <- i
wg.Done()
continue
}
go func(i Object) {
defer wg.Done()
if err := someLogic(i); err != nil {
i.Err = err
out <- i
return
}
out <- i
}(i)
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}
func makeName(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
out := make(chan Object)
var wg sync.WaitGroup
wg.Add(ct)
go func() {
for i := range in {
select {
case <-done:
return
default:
}
if i.Err != nil {
out <- i
wg.Done()
continue
}
go func(i Object) {
defer wg.Done()
time.Sleep(time.Second / 3)
i.Name = "I've awoken!"
out <- i
}(i)
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}
https://play.golang.org/p/W6zx5Qk6yh
The new signature (changing an int
channel to have a type of Object
, just to give a little more complexity and realism):
func(done <-chan struct{}, ct int, in <-chan Object) <-chan Object
Cool! They all handle requests concurrently, as quickly as possible.
Wow, they're super arrow-ey. All the logic seems to be duplicated. What if we make a new function that has all that logic?
type actionFn func(wg *sync.WaitGroup, o Object, out <-chan Object)
func action(fn actionFn) pipelineFn {
return func(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
out := make(chan Object)
var wg sync.WaitGroup
wg.Add(ct)
go func() {
for i := range in {
select {
case <-done:
return
default:
}
if i.Err != nil {
out <- i
wg.Done()
continue
}
go fn(wg, i, out)
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}
}
This seems to contain all of our common pipeline knowledge. How do we change this to be used by the pipeline?
Well, to start with makeName
, rewrite it to compose JUST the unique logic in a function assignment. Then pass that to action, along with the rest of the arguments passed to makeName:
func makeName(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
fn := func(wg *sync.WaitGroup, o Object, out chan Object) {
defer wg.Done()
time.Sleep(time.Second / 3)
if err := someLogic(o); err != nil {
time.Sleep(time.Second / 3)
o.Err = err
out <- o
}
o.Name = "I have a name!"
out <- o
}
return action(fn)(done, ct, in)
}
The same for storeItem
func storeItem(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
fn := func(wg *sync.WaitGroup, o Object, out chan Object) {
defer wg.Done()
if err := someLogic(o); err != nil {
o.Err = err
out <- o
return
}
out <- o
}
return action(fn)(done, ct, in)
}
The result after a rewrite:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
done := make(chan struct{})
items := []Object{Object{ID: 1}, Object{ID: 2}, Object{ID: 3}}
l := len(items)
pipeline := storeItem(done, l, makeName(done, l, pipe(done, items...)))
for item := range pipeline {
fmt.Println(item)
}
}
// some data structure
type Object struct {
ID int
Name string
Err error
}
// fake logic
func someLogic(o Object) error {
return nil
}
// pipelineFn keeps the previous pipeline signature
type pipelineFn func(done <-chan struct{}, ct int, in <-chan Object) <-chan Object
// actionFn is an action that writs all the common logic
type actionFn func(wg *sync.WaitGroup, o Object, out chan Object)
// action takes care of the waitgroups, passing errors, and returning a channel
func action(fn actionFn) pipelineFn {
return func(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
out := make(chan Object)
var wg sync.WaitGroup
wg.Add(ct)
go func() {
for i := range in {
select {
case <-done:
return
default:
}
if i.Err != nil {
out <- i
wg.Done()
continue
}
go fn(&wg, i, out)
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}
}
func pipe(done <-chan struct{}, os ...Object) <-chan Object {
out := make(chan Object)
go func() {
for _, v := range os {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}
func storeItem(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
fn := func(wg *sync.WaitGroup, o Object, out chan Object) {
defer wg.Done()
if err := someLogic(o); err != nil {
o.Err = err
out <- o
return
}
out <- o
}
return action(fn)(done, ct, in)
}
func makeName(done <-chan struct{}, ct int, in <-chan Object) <-chan Object {
fn := func(wg *sync.WaitGroup, o Object, out chan Object) {
defer wg.Done()
time.Sleep(time.Second / 3)
if err := someLogic(o); err != nil {
time.Sleep(time.Second / 3)
o.Err = err
out <- o
}
o.Name = "I have a name!"
out <- o
}
return action(fn)(done, ct, in)
}
https://play.golang.org/p/FAWyUVsAP1
All the logic in main
is the exact same. All the common logic is in action
and all the unique logic is in each pipeline function, passed to action
.
If you're looking for line count savings -- you'll see we only saved two. This grows exponentially with the complexity of your pipeline, and don't forget we made new types to make our function signatures more explicit.
The pipeline construction is a litle harder to read since we're evaluating pipes from the inside out. Languages like Elm solve this by having a pipe operator (|>
) that gets ran in an easier to read way. We can mock this setup in Go with a new function:
// constructipeline pipeline construction a little easier to follow.
func constructPipeline(done <-chan struct{}, ct int, in <-chan Object, pipes ...pipelineFn) <-chan Object {
out := in // placeholder to save the result of the previous (or initial) call
for _, fn := range pipes {
out = fn(done, ct, out)
}
return out
}
Our new pipeline looks like this:
pipeline := constructPipeline(done, l,
pipe(done, items...), // first step
makeName, // second step
storeItem) // third step
Final code: https://play.golang.org/p/6hLGAzCXrn
Maybe a more realistic pipeline would look something like this, taking dependancies as a first parameter to a closure (works for a personal project):
pipeline := constructPipeline(done, l,
pipe(done, items...),
createItem(db),
createBilling(billing),
updateItem(db),
saveItemImages(s3),
updateItem(db)),
What have we solved?
- Composable concurrent functions
- Grouping common logic out to one function
- Separating unique logic out to their own function assignments
- No race conditions
- Ridiculously fast