-
-
Save ejemba/e4aa92520e302c8c503d8701547585d4 to your computer and use it in GitHub Desktop.
chaining channels in go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"unicode/utf8" | |
) | |
type Item string | |
type Stream chan Item | |
type Acc string | |
func NewWordStream(words []Item) Stream { | |
stream := make(Stream) | |
go func() { | |
for _, w := range words { | |
stream <- w | |
} | |
close(stream) | |
}() | |
return stream | |
} | |
func (in Stream) Map(fn func(Item) Item) Stream { | |
out := make(Stream) | |
go func() { | |
for x := range in { | |
out <- fn(x) | |
} | |
close(out) | |
}() | |
return out | |
} | |
func (in Stream) Filter(fn func(Item) bool) Stream { | |
out := make(Stream) | |
go func() { | |
for x := range in { | |
if fn(x) { | |
out <- x | |
} | |
} | |
close(out) | |
}() | |
return out | |
} | |
func (in Stream) Reduce(fn func(Acc, Item) (Acc, Acc)) chan Acc { | |
out := make(chan Acc) | |
go func() { | |
var val, acc Acc | |
for x := range in { | |
acc, val = fn(acc, x) | |
if val != "" { | |
out <- val | |
} | |
} | |
close(out) | |
}() | |
return out | |
} | |
func (in Stream) Sniff(fn func(Item)) Stream { | |
out := make(Stream) | |
go func() { | |
for x := range in { | |
fn(x) | |
out <- x | |
} | |
close(out) | |
}() | |
return out | |
} | |
func (in Stream) Split() (Stream, Stream) { | |
out1, out2 := make(Stream), make(Stream) | |
go func() { | |
for x := range in { | |
out1 <- x | |
out2 <- x | |
} | |
close(out1) | |
close(out2) | |
}() | |
return out1, out2 | |
} | |
func (in Stream) ReduceAll(fn func(Acc, Item) Acc) Acc { | |
var acc Acc | |
for x := range in { | |
acc = fn(acc, x) | |
} | |
return acc | |
} | |
func (in Stream) Collect() []Item { | |
a := make([]Item, 0, 5) | |
for x := range in { | |
a = append(a, x) | |
} | |
return a | |
} | |
func (in Stream) Do(fn func(Item)) { | |
for x := range in { | |
fn(x) | |
} | |
} | |
func main() { | |
reverse := func(word Item) Item { | |
s := string(word) | |
o := make([]rune, utf8.RuneCountInString(s)) | |
i := len(o) | |
for _, c := range s { | |
i-- | |
o[i] = c | |
} | |
return Item(o) | |
} | |
longerThan := func(n int) func(Item) bool { | |
return func(w Item) bool { | |
return len(w) > n | |
} | |
} | |
not := func(s string) func(Item) bool { | |
item := Item(s) | |
return func(w Item) bool { | |
return item != w | |
} | |
} | |
words := []Item{"this", "is", "pretty", "cool", "1", "2", "3"} | |
NewWordStream(words).Map(reverse).Filter(longerThan(2)).Map(reverse).Filter(not("this")).Do(func(w Item) { | |
println(w) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"log" | |
) | |
type Stream struct { | |
in chan string | |
close chan bool | |
err chan error | |
} | |
func (s *Stream) Close() { s.close <- true } | |
func (s *Stream) Fail(err error) { s.Close(); s.Drain(); s.err <- err } | |
func (s *Stream) Drain() { | |
for _ = range s.in { | |
} | |
} | |
func NewWordStream(words []string) *Stream { | |
s := &Stream{make(chan string), make(chan bool), make(chan error)} | |
go func() { | |
loop: | |
for _, w := range words { | |
select { | |
case <-s.close: | |
break loop | |
case s.in <- w: | |
continue | |
} | |
} | |
close(s.in) | |
}() | |
return s | |
} | |
func (s *Stream) Map(fn func(string) (string, error)) *Stream { | |
out := make(chan string) | |
go func() { | |
for x := range s.in { | |
x, err := fn(x) | |
if err != nil { | |
s.Fail(err) | |
} else { | |
out <- x | |
} | |
} | |
close(out) | |
}() | |
return &Stream{out, s.close, s.err} | |
} | |
func (s *Stream) Filter(fn func(string) (bool, error)) *Stream { | |
out := make(chan string) | |
go func() { | |
for x := range s.in { | |
ok, err := fn(x) | |
if err != nil { | |
s.Fail(err) | |
} else if ok { | |
out <- x | |
} | |
} | |
close(out) | |
}() | |
return &Stream{out, s.close, s.err} | |
} | |
func (s *Stream) Do(fn func(string)) error { | |
var err error | |
loop: | |
for { | |
select { | |
case x, ok := <-s.in: | |
if !ok { | |
break loop | |
} | |
fn(x) | |
case err = <-s.err: | |
} | |
} | |
return err | |
} | |
func main() { | |
words := []string{"this", "is", "pretty", "cool", "1", "2", "3"} | |
longerThan3 := func(s string) (bool, error) { return len(s) > 3, nil } | |
wrap := func(s string) (string, error) { | |
if s == "cool" { | |
return "", errors.New("not cool") | |
} | |
return "<" + s + ">", nil | |
} | |
err := NewWordStream(words).Map(wrap).Filter(longerThan3).Do(func(w string) { log.Print(w, " ") }) | |
if err != nil { | |
log.Fatalf("Error: %s", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment