Created
November 6, 2015 16:43
-
-
Save apg/ade83a5823cdff76cbc3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
) | |
type Event map[string]interface{} | |
type asyncStreamFunc func(done <-chan struct{}, in <-chan Event, out chan<- Event) | |
var multiples = map[interface{}]int{ | |
"giraffe": 1, | |
"lion": 10000, | |
} | |
// Partitions the incoming stream by "field" and passes to streamFunc | |
func by(field string, fn asyncStreamFunc, done <-chan struct{}, in <-chan Event) <-chan Event { | |
var wg sync.WaitGroup | |
defer wg.Wait() | |
retout := make(chan Event) | |
partitioned := make(map[interface{}]chan Event) | |
output := func(in chan Event) { | |
defer wg.Done() | |
defer close(in) | |
fn(done, in, retout) | |
} | |
go func() { | |
defer close(retout) | |
for e := range in { | |
// do we have a partition yet for this? | |
foo := e[field] | |
out, ok := partitioned[foo] | |
if !ok { | |
out = make(chan Event) | |
partitioned[foo] = out | |
wg.Add(1) | |
go output(out) | |
println("spawned an asq") | |
} | |
select { | |
case out <- e: | |
case <-done: | |
} | |
} | |
}() | |
return retout | |
} | |
// Async channels don't close their output channel, as it's shared. | |
func asq(done <-chan struct{}, in <-chan Event, out chan<- Event) { | |
println("Reading on in") | |
for e := range in { | |
if ni, ok := e["value"].(int); ok { | |
ne := copyEvent(e) | |
ne["value"] = ni * multiples[e["type"]] | |
select { | |
case out <- ne: | |
case <-done: | |
return | |
} | |
} | |
} | |
} | |
func gen(n int) <-chan Event { | |
out := make(chan Event, n) | |
for i := 0; i < n; i++ { | |
e := make(Event) | |
e["value"] = i | |
if i%2 == 0 { | |
e["type"] = "giraffe" | |
} else { | |
e["type"] = "lion" | |
} | |
out <- e | |
} | |
close(out) | |
return out | |
} | |
func copyEvent(e Event) Event { | |
e2 := make(Event) | |
for k, v := range e { | |
e2[k] = v | |
} | |
return e2 | |
} | |
func main() { | |
done := make(chan struct{}) | |
defer close(done) | |
gg := gen(1000) | |
out := by("type", asq, done, gg) | |
for n := range out { | |
fmt.Printf("%+v\n", n) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment