Skip to content

Instantly share code, notes, and snippets.

@kevinswiber
Last active January 2, 2016 02:32
Show Gist options
  • Select an option

  • Save kevinswiber/d0f909aeba22e6fe2f67 to your computer and use it in GitHub Desktop.

Select an option

Save kevinswiber/d0f909aeba22e6fe2f67 to your computer and use it in GitHub Desktop.
Running parallel sequential pipelines in Go
package main
import (
"fmt"
"golang.org/x/net/context"
)
// Example of processing multiple sequential pipelines in parallel
func main() {
done := make(chan struct{})
ints := []int{2, 3, 20, 30}
for _, n := range ints {
n := n
go func() {
runPipelineSequence(n)
done <- struct{}{}
}()
}
for _ = range ints {
<-done
}
}
func runPipelineSequence(n int) {
pipeline := newPipeline()
go func() {
defer close(pipeline)
pipeline <- func(in <-chan context.Context, out chan<- context.Context) {
defer close(out)
ctx := <-in
value := ctx.Value("value").(int)
fmt.Printf("value: %d\n", value)
out <- context.WithValue(ctx, "value", value*value)
}
pipeline <- func(in <-chan context.Context, out chan<- context.Context) {
defer close(out)
ctx := <-in
fmt.Printf("value: %d\n", ctx.Value("value"))
}
}()
ctx := context.WithValue(context.Background(), "value", n)
start(ctx, pipeline)
}
func newPipeline() chan func(<-chan context.Context, chan<- context.Context) {
// a pipeline is a channel that takes functions with readable in channels and
// writeable out channels
return make(chan func(<-chan context.Context, chan<- context.Context))
}
func start(ctx context.Context,
pipeline chan func(<-chan context.Context, chan<- context.Context)) {
in := make(chan context.Context, 1)
go func() {
in <- ctx
close(in)
}()
for pipe := range pipeline {
out := make(chan context.Context, 1)
pipe(in, out)
in = out
}
}
package main
import (
"fmt"
"golang.org/x/net/context"
)
// Example of processing multiple sequential pipelines in parallel
func main() {
pipeline := make(chan func(<-chan context.Context, chan<- context.Context), 10)
pipeCache := make([]func(<-chan context.Context, chan<- context.Context), 0)
done := make(chan struct{})
ints := []int{2, 3, 20, 7, 21, 47, 99, 107, 48, 982, 28323, 987}
go func() {
defer close(pipeline)
pipeline <- func(in <-chan context.Context, out chan<- context.Context) {
defer close(out)
ctx := <-in
value := ctx.Value("value").(int)
fmt.Printf("value: %d\n", value)
out <- context.WithValue(ctx, "value", value*value)
}
pipeline <- func(in <-chan context.Context, out chan<- context.Context) {
defer close(out)
ctx := <-in
fmt.Printf("value: %d\n", ctx.Value("value"))
}
}()
for _, n := range ints {
n := n
go func() {
ctx := context.WithValue(context.Background(), "value", n)
in := make(chan context.Context, 1)
go func() {
in <- ctx
close(in)
}()
if cap(pipeCache) == 0 {
for pipe := range pipeline {
pipeCache = append(pipeCache, pipe)
}
}
for _, pipe := range pipeCache {
out := make(chan context.Context, 1)
pipe(in, out)
in = out
}
done <- struct{}{}
}()
}
for _ = range ints {
<-done
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment