Skip to content

Instantly share code, notes, and snippets.

@icholy
Last active December 16, 2015 15:09
Show Gist options
  • Save icholy/5454225 to your computer and use it in GitHub Desktop.
Save icholy/5454225 to your computer and use it in GitHub Desktop.
Go stream chaining with error handling
package main
import (
"errors"
"log"
)
type Item struct {
val string
err error
}
type Stream struct {
in chan Item
close chan bool
}
func (s *Stream) Close() {
// send the close signal
s.close <- true
// drain any remaining Item's from the stream
for _ = range s.in {
}
}
func NewWordStream(words []string) *Stream {
s := &Stream{make(chan Item), make(chan bool)}
go func() {
loop:
for _, w := range words {
select {
// if we get a close signal, break the loop and close the channel
case <-s.close:
break loop
// wrap the word in an Item and send it on the channel
case s.in <- Item{w, nil}:
continue
}
}
close(s.in)
}()
return s
}
func (s *Stream) Map(fn func(string) (string, error)) *Stream {
out := make(chan Item)
go func() {
erroring := false
for item := range s.in {
// ignore all items while erroring
if erroring {
continue
}
// if Item is an error, forward it, and start erroring
if item.err != nil {
erroring = true
out <- item
continue
}
// run the transformation function
val, err := fn(item.val)
// if the transformation returns an error, forward it and start erroring
if err != nil {
erroring = true
out <- Item{"", err}
continue
}
// everything went ok, forwarding transformed string
out <- Item{val, nil}
}
close(out)
}()
return &Stream{out, s.close}
}
func (s *Stream) Filter(fn func(string) (bool, error)) *Stream {
out := make(chan Item)
go func() {
erroring := false
for item := range s.in {
// ignore all items while erroring
if erroring {
continue
}
// if Item is an error, forward it and start erroring
if item.err != nil {
erroring = true
out <- item
continue
}
// run the predicate function
ok, err := fn(item.val)
// if the predicate returned an error, forward it and start erroring
if err != nil {
erroring = true
out <- Item{"", err}
continue
}
// if the predicate returned true, forward the item
if ok {
out <- item
}
}
close(out)
}()
return &Stream{out, s.close}
}
func (s *Stream) Do(fn func(string) error) error {
var err error
for item := range s.in {
// when an error Item is recieved, save the error and close the stream
if item.err != nil {
err = item.err
s.Close()
continue
}
// otherwise invoke the callback with the item
if e := fn(item.val); e != nil {
err = e
s.Close()
continue
}
}
return err
}
func main() {
// stream items
words := []string{"this", "is", "pretty", "cool", "1", "2", "3"}
// predicate function
gt3 := func(s string) (bool, error) {
return len(s) > 3, nil
}
// transformation function
wrap := func(s string) (string, error) {
if s == "cool" {
return "", errors.New("not cool")
}
return "<" + s + ">", nil
}
// consumer
show := func(w string) error {
log.Print(w, " ")
return nil
}
// producer
stream := NewWordStream(words)
// do it!
if err := stream.Map(wrap).Filter(gt3).Do(show); err != nil {
log.Fatalf("Error: %s", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment