Skip to content

Instantly share code, notes, and snippets.

@urandom
Last active August 6, 2019 19:33
Show Gist options
  • Save urandom/40ea1b3fa41be88fdbd4fd81c044e02d to your computer and use it in GitHub Desktop.
Save urandom/40ea1b3fa41be88fdbd4fd81c044e02d to your computer and use it in GitHub Desktop.
Bounded pipelines with generics
package pipes_test
import (
"context"
"crypto/md5"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"github.com/urandom/pipes"
)
func ExampleFileWalker() {
ctx := context.Background()
paths := pipes.Generate(ctx, walkTestDir)
for ev := range pipes.Bounded(ctx, paths, runtime.GOMAXPROCS(-1)) {
if ev.Err != nil {
fmt.Printf("Error during processing: %v", ev.Err)
continue
}
fmt.Printf("File %s has md5 sum of %x", ev.Data.path, ev.Data.sum)
}
}
func walkTestDir(ctx context.Context, in chan<- string) error {
return filepath.Walk("testdata", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case in <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}
type result struct {
path string
sum [md5.Size]byte
}
func processPath(path string) (result, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return result{}, err
}
return result{path, md5.Sum(data)}, nil
}
package pipes
import (
"context"
"golang.org/x/sync/errgroup"
)
type Event(type Element) struct {
Data Element
Err error
}
type Producer(type Element) func(context.Context, chan<- Element) error
const (
elementsBuffer = 10
)
func Generate(type Element)(
ctx context.Context, p Producer(Element), additional ...Producer(Element),
) <-chan Event(Element) {
g, ctx := errgroup.WithContext(ctx)
out := make(chan Event(Element))
elements := make(chan Element, elementsBuffer)
for i, producer := 0, p; i < len(additional)+1; i++ {
if i > 0 {
producer = additional[i-1]
}
g.Go(func(type Element)(p Producer(Element)) func() error {
return func() error {
return p(ctx, elements)
}
}(producer))
}
go func() {
g.Wait()
close(elements)
}()
go func() {
defer close(out)
for el := range elements {
out <- Event{Data: el}
}
if err := g.Wait(); err != nil {
out <- Event(Element){Err: err}
}
}()
return out
}
type Processor(type Element, OutElement) func(Element) (OutElement, error)
func Bounded(type Element, OutElement)(
ctx context.Context, events <-chan Event(Element), p Processor(OutElement), total int,
) <-chan Event(OutElement) {
g, ctx := errgroup.WithContext(ctx)
out := make(chan Event(OutElement))
for i := 0; i < total; i++ {
g.Go(func() error {
for ev := range events {
element, err := ev.Data, ev.Err
var outElement OutElement
if err == nil {
outElement, err = p(element)
}
select {
case out <- Event{Data: outElement, Err: err}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
go func() {
g.Wait()
close(out)
}()
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment