Created
July 26, 2018 12:00
-
-
Save anthonynsimon/1af4e19f10a3d1f4742e15b2b79aacd3 to your computer and use it in GitHub Desktop.
Go streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"fmt" | |
"io" | |
"net" | |
"strings" | |
) | |
func main() { | |
source := TCPSocketSource(":8080", '\n') | |
transform := StringTransform(strings.ToUpper) | |
// partition := Partition(func(input interface{}) interface{} { return strings.Contains(input.(string), "BOB") }) | |
sink := StdoutSink() | |
Pipe(source).To(transform) | |
Pipe(transform).To(sink) | |
// Pipe(source) | |
// .To(PartitionBy("id")) | |
// .To(Transform(strings.ToUpper)) | |
// .To(MergeOrderedBy(length) | |
// .To(StdoutSink) | |
// Pipe(source).To(PartitionBy("id")).To(Transform(strings.ToUpper)).To(MergeOrderedBy(length).To(StdoutSink) | |
// ---> Transform --\ | |
// Source ---> PartitionBy ---> Transform --- MergeOrderedBy(Length) ---> Sink | |
// ---> Transform --/ | |
select {} | |
} | |
type Source interface { | |
Output() <-chan interface{} | |
} | |
type PartitionedSource interface { | |
Output() map[interface{}]chan interface{} | |
} | |
type PartitionedSink interface { | |
Input(map[interface{}]chan interface{}) | |
} | |
type Flow interface { | |
Sink | |
Source | |
} | |
type PartitionedFlow interface { | |
PartitionedSource | |
PartitionedSink | |
} | |
type JoinerFlow interface { | |
PartitionedSink | |
Source | |
} | |
type SplitterFlow interface { | |
Sink | |
PartitionedSource | |
} | |
type Sink interface { | |
Input(<-chan interface{}) | |
} | |
type Pipable interface { | |
To(Sink) | |
} | |
type partitioned struct { | |
input <-chan interface{} | |
partitioner func(interface{}) interface{} | |
} | |
func (p *partitioned) Input(input <-chan interface{}) { | |
p.input = input | |
} | |
func (p *partitioned) Output() map[interface{}]chan interface{} { | |
partitions := make(map[interface{}]chan interface{}) | |
go func() { | |
for { | |
select { | |
case message := <-p.input: | |
partitionKey := p.partitioner(message) | |
parition, ok := partitions[partitionKey] | |
if !ok { | |
partitions[partitionKey] = make(chan interface{}) | |
} | |
parition <- message | |
} | |
} | |
}() | |
return partitions | |
} | |
func Partition(partitioner func(interface{}) interface{}) SplitterFlow { | |
return &partitioned{input: nil, partitioner: partitioner} | |
} | |
type piper struct { | |
source Source | |
} | |
func Pipe(source Source) Pipable { | |
return &piper{source} | |
} | |
func (p *piper) To(sink Sink) { | |
go func() { | |
fmt.Printf("Creating pipeline from %#v to %#v\n", p.source, sink) | |
sink.Input(p.source.Output()) | |
}() | |
} | |
type tcpSocketSource struct { | |
addr string | |
delimiter byte | |
} | |
func (t *tcpSocketSource) Output() <-chan interface{} { | |
inner := make(chan interface{}) | |
go func() { | |
listener, err := net.Listen("tcp", t.addr) | |
if err != nil { | |
panic(err) | |
} | |
for { | |
conn, err := listener.Accept() | |
if err != nil { | |
fmt.Println(fmt.Errorf("Error on TCP socket connect: %s", err)) | |
} | |
go func() { | |
for { | |
message, err := bufio.NewReader(conn).ReadString(t.delimiter) | |
if err != nil { | |
if err != io.EOF { | |
fmt.Println(fmt.Errorf("Error on TCP socket read: %s", err)) | |
} | |
} else { | |
inner <- strings.Replace(message, string(t.delimiter), "", -1) | |
} | |
} | |
}() | |
} | |
}() | |
return inner | |
} | |
func TCPSocketSource(addr string, delimiter byte) Source { | |
return &tcpSocketSource{addr, delimiter} | |
} | |
type stringTransform struct { | |
fn func(string) string | |
inner chan interface{} | |
} | |
func (t *stringTransform) Input(input <-chan interface{}) { | |
for { | |
select { | |
case message := <-input: | |
parsed, ok := message.(string) | |
if !ok { | |
panic("wrong message type, expected string") | |
} | |
t.inner <- t.fn(parsed) | |
} | |
} | |
} | |
func (t *stringTransform) Output() <-chan interface{} { | |
return t.inner | |
} | |
func StringTransform(fn func(string) string) Flow { | |
return &stringTransform{fn, make(chan interface{})} | |
} | |
type stdoutSink struct{} | |
func (t *stdoutSink) Input(input <-chan interface{}) { | |
for { | |
select { | |
case message := <-input: | |
fmt.Println(message) | |
} | |
} | |
} | |
func StdoutSink() Sink { | |
return &stdoutSink{} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment