Last active
December 16, 2015 15:09
-
-
Save icholy/5454225 to your computer and use it in GitHub Desktop.
Go stream chaining with error handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"log" | |
) | |
type Item struct { | |
val string | |
err error | |
} | |
type Stream struct { | |
in chan Item | |
close chan bool | |
} | |
func (s *Stream) Close() { | |
// send the close signal | |
s.close <- true | |
// drain any remaining Item's from the stream | |
for _ = range s.in { | |
} | |
} | |
func NewWordStream(words []string) *Stream { | |
s := &Stream{make(chan Item), make(chan bool)} | |
go func() { | |
loop: | |
for _, w := range words { | |
select { | |
// if we get a close signal, break the loop and close the channel | |
case <-s.close: | |
break loop | |
// wrap the word in an Item and send it on the channel | |
case s.in <- Item{w, nil}: | |
continue | |
} | |
} | |
close(s.in) | |
}() | |
return s | |
} | |
func (s *Stream) Map(fn func(string) (string, error)) *Stream { | |
out := make(chan Item) | |
go func() { | |
erroring := false | |
for item := range s.in { | |
// ignore all items while erroring | |
if erroring { | |
continue | |
} | |
// if Item is an error, forward it, and start erroring | |
if item.err != nil { | |
erroring = true | |
out <- item | |
continue | |
} | |
// run the transformation function | |
val, err := fn(item.val) | |
// if the transformation returns an error, forward it and start erroring | |
if err != nil { | |
erroring = true | |
out <- Item{"", err} | |
continue | |
} | |
// everything went ok, forwarding transformed string | |
out <- Item{val, nil} | |
} | |
close(out) | |
}() | |
return &Stream{out, s.close} | |
} | |
func (s *Stream) Filter(fn func(string) (bool, error)) *Stream { | |
out := make(chan Item) | |
go func() { | |
erroring := false | |
for item := range s.in { | |
// ignore all items while erroring | |
if erroring { | |
continue | |
} | |
// if Item is an error, forward it and start erroring | |
if item.err != nil { | |
erroring = true | |
out <- item | |
continue | |
} | |
// run the predicate function | |
ok, err := fn(item.val) | |
// if the predicate returned an error, forward it and start erroring | |
if err != nil { | |
erroring = true | |
out <- Item{"", err} | |
continue | |
} | |
// if the predicate returned true, forward the item | |
if ok { | |
out <- item | |
} | |
} | |
close(out) | |
}() | |
return &Stream{out, s.close} | |
} | |
func (s *Stream) Do(fn func(string) error) error { | |
var err error | |
for item := range s.in { | |
// when an error Item is recieved, save the error and close the stream | |
if item.err != nil { | |
err = item.err | |
s.Close() | |
continue | |
} | |
// otherwise invoke the callback with the item | |
if e := fn(item.val); e != nil { | |
err = e | |
s.Close() | |
continue | |
} | |
} | |
return err | |
} | |
func main() { | |
// stream items | |
words := []string{"this", "is", "pretty", "cool", "1", "2", "3"} | |
// predicate function | |
gt3 := func(s string) (bool, error) { | |
return len(s) > 3, nil | |
} | |
// transformation function | |
wrap := func(s string) (string, error) { | |
if s == "cool" { | |
return "", errors.New("not cool") | |
} | |
return "<" + s + ">", nil | |
} | |
// consumer | |
show := func(w string) error { | |
log.Print(w, " ") | |
return nil | |
} | |
// producer | |
stream := NewWordStream(words) | |
// do it! | |
if err := stream.Map(wrap).Filter(gt3).Do(show); err != nil { | |
log.Fatalf("Error: %s", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment