Last active
August 6, 2019 19:33
-
-
Save urandom/40ea1b3fa41be88fdbd4fd81c044e02d to your computer and use it in GitHub Desktop.
Bounded pipelines with generics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipes_test | |
import ( | |
"context" | |
"crypto/md5" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"path/filepath" | |
"runtime" | |
"github.com/urandom/pipes" | |
) | |
func ExampleFileWalker() { | |
ctx := context.Background() | |
paths := pipes.Generate(ctx, walkTestDir) | |
for ev := range pipes.Bounded(ctx, paths, runtime.GOMAXPROCS(-1)) { | |
if ev.Err != nil { | |
fmt.Printf("Error during processing: %v", ev.Err) | |
continue | |
} | |
fmt.Printf("File %s has md5 sum of %x", ev.Data.path, ev.Data.sum) | |
} | |
} | |
func walkTestDir(ctx context.Context, in chan<- string) error { | |
return filepath.Walk("testdata", func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
if !info.Mode().IsRegular() { | |
return nil | |
} | |
select { | |
case in <- path: | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
return nil | |
}) | |
} | |
type result struct { | |
path string | |
sum [md5.Size]byte | |
} | |
func processPath(path string) (result, error) { | |
data, err := ioutil.ReadFile(path) | |
if err != nil { | |
return result{}, err | |
} | |
return result{path, md5.Sum(data)}, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipes | |
import ( | |
"context" | |
"golang.org/x/sync/errgroup" | |
) | |
type Event(type Element) struct { | |
Data Element | |
Err error | |
} | |
type Producer(type Element) func(context.Context, chan<- Element) error | |
const ( | |
elementsBuffer = 10 | |
) | |
func Generate(type Element)( | |
ctx context.Context, p Producer(Element), additional ...Producer(Element), | |
) <-chan Event(Element) { | |
g, ctx := errgroup.WithContext(ctx) | |
out := make(chan Event(Element)) | |
elements := make(chan Element, elementsBuffer) | |
for i, producer := 0, p; i < len(additional)+1; i++ { | |
if i > 0 { | |
producer = additional[i-1] | |
} | |
g.Go(func(type Element)(p Producer(Element)) func() error { | |
return func() error { | |
return p(ctx, elements) | |
} | |
}(producer)) | |
} | |
go func() { | |
g.Wait() | |
close(elements) | |
}() | |
go func() { | |
defer close(out) | |
for el := range elements { | |
out <- Event{Data: el} | |
} | |
if err := g.Wait(); err != nil { | |
out <- Event(Element){Err: err} | |
} | |
}() | |
return out | |
} | |
type Processor(type Element, OutElement) func(Element) (OutElement, error) | |
func Bounded(type Element, OutElement)( | |
ctx context.Context, events <-chan Event(Element), p Processor(OutElement), total int, | |
) <-chan Event(OutElement) { | |
g, ctx := errgroup.WithContext(ctx) | |
out := make(chan Event(OutElement)) | |
for i := 0; i < total; i++ { | |
g.Go(func() error { | |
for ev := range events { | |
element, err := ev.Data, ev.Err | |
var outElement OutElement | |
if err == nil { | |
outElement, err = p(element) | |
} | |
select { | |
case out <- Event{Data: outElement, Err: err}: | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
return nil | |
}) | |
} | |
go func() { | |
g.Wait() | |
close(out) | |
}() | |
return out | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment