Last active
June 7, 2017 07:47
-
-
Save mh-cbon/2062efef48e246592bdb0665f0ab8547 to your computer and use it in GitHub Desktop.
golang, simple stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"io" | |
"strings" | |
"sync" | |
"time" | |
) | |
func main() { | |
ds := []string{"a", "b", "c", "d"} | |
pseudoReader := Reader{ds: ds} | |
pseudoWriter := Writer{} | |
writeAnyway := func(s string) error { | |
_, err := pseudoWriter.Write(s) | |
return err | |
} | |
sink := Stream{}. | |
Async(PseudoQuery(2)). | |
Limit(2). | |
Map(strings.ToUpper). | |
Write(writeAnyway) | |
readWriter := Readable(pseudoReader.Read, sink) | |
fmt.Println("rwErr", readWriter()) | |
fmt.Println() | |
} | |
type stringHandler struct { | |
in string | |
push func(string) error | |
} | |
func PseudoQuery(n int) (func(in string, push func(string) error) error, func() error) { | |
eof := make(chan bool) | |
data := make(chan stringHandler, n-1) | |
err := make(chan error, n-1) | |
var wg sync.WaitGroup | |
go func() { | |
for { | |
select { | |
case <-eof: | |
return | |
case d := <-data: | |
<-time.After(1 * time.Second) | |
e := d.push(d.in) | |
wg.Done() | |
fmt.Println("done " + d.in) | |
err <- e | |
} | |
} | |
}() | |
dataHandler := func(in string, push func(string) error) error { | |
wg.Add(1) | |
data <- stringHandler{in, push} | |
fmt.Println("doing " + in) | |
select { | |
case e := <-err: | |
if e == io.EOF { | |
eof <- true | |
} | |
return e | |
default: | |
} | |
return nil | |
} | |
flushHandler := func() error { | |
wg.Wait() | |
go func() { eof <- true }() | |
return nil | |
} | |
return dataHandler, flushHandler | |
} | |
func Readable(reader func() (string, error), sink Stream) (readWriter func() error) { | |
var index = -1 | |
readWriter = func() (readWriteErr error) { | |
var chunk string | |
for { | |
chunk, readWriteErr = reader() | |
if len(chunk) > 0 { | |
readWriteErr = sink.TryPush(chunk) // forced stop on write err ? | |
} | |
if readWriteErr != nil { | |
break | |
} | |
index++ | |
} | |
if x, ok := readWriteErr.(AtIndexError); ok { | |
x.SetIndex(index) | |
readWriteErr = x | |
} | |
sink.Flush() | |
return | |
} | |
return | |
} | |
type Reader struct { | |
ds []string | |
i int | |
} | |
func (r *Reader) Read() (string, error) { | |
if r.i >= len(r.ds) { | |
return "", io.EOF | |
} | |
ret := r.ds[r.i] | |
r.i++ | |
fmt.Printf("Read: %v\n", ret) | |
return ret, nil | |
} | |
type Writer struct{} | |
func (r Writer) Write(s string) (int, error) { | |
fmt.Printf("Write: %v\n", s) | |
return len(s), nil | |
} | |
var EarlyEnd = errors.New("early end") | |
type AtIndexError struct { | |
error | |
atindex int | |
} | |
func (e *AtIndexError) SetIndex(i int) { e.atindex = i } | |
func (e AtIndexError) GetIndex() int { return e.atindex } | |
func (e AtIndexError) Error() string { return fmt.Sprintf("%v %v", e.error.Error(), e.atindex) } | |
type Stream struct { | |
ds []string | |
op []func(in string, push func(string) error) error | |
flush []func() error | |
} | |
func (s Stream) Async(fn func(in string, push func(string) error) error, flush func() error) Stream { | |
s.op = append(s.op, fn) | |
s.flush = append(s.flush, flush) | |
return s | |
} | |
func (s Stream) Map(fn func(string) string) Stream { | |
s.op = append(s.op, func(in string, push func(string) error) error { | |
out := fn(in) | |
return push(out) | |
}) | |
s.flush = append(s.flush, func() error { return nil }) | |
return s | |
} | |
func (s Stream) Each(fn func(string)) Stream { | |
s.op = append(s.op, func(in string, push func(string) error) error { | |
fn(in) | |
return push(in) | |
}) | |
s.flush = append(s.flush, func() error { return nil }) | |
return s | |
} | |
func (s Stream) Filter(f ...func(string) bool) Stream { | |
s.op = append(s.op, func(in string, push func(string) error) error { | |
ok := false | |
for _, fn := range f { | |
ok = fn(in) | |
if !ok { | |
break | |
} | |
} | |
if ok { | |
return push(in) | |
} | |
return nil | |
}) | |
s.flush = append(s.flush, func() error { return nil }) | |
return s | |
} | |
func (s Stream) Limit(n int) Stream { | |
done := 0 | |
s.op = append(s.op, func(in string, push func(string) error) error { | |
if done < n { | |
done++ | |
return push(in) | |
} | |
return AtIndexError{error: EarlyEnd} | |
}) | |
s.flush = append(s.flush, func() error { return nil }) | |
return s | |
} | |
func (s Stream) TryPush(in string) error { | |
var callop func(e int, in string) error | |
callop = func(e int, in string) (err error) { | |
if len(s.op) == e { | |
return io.EOF // ? | |
} | |
return s.op[e](in, func(out string) error { | |
return callop(e+1, out) | |
}) | |
} | |
return callop(0, in) | |
} | |
func (s Stream) Push(in string) { | |
s.TryPush(in) | |
} | |
func (s Stream) All() (err error) { | |
for _, v := range s.ds { | |
if err := s.TryPush(v); err != nil { | |
break | |
} | |
} | |
if err == nil { | |
err = s.Flush() | |
} | |
return err | |
} | |
func (s Stream) Flush() (err error) { | |
for _, f := range s.flush { | |
// should do multi error here. | |
err = f() | |
} | |
return err | |
} | |
func (s Stream) Write(sink func(string) error) Stream { | |
s.op = append(s.op, func(in string, push func(string) error) (writeErr error) { | |
return sink(in) | |
}) | |
return s | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment