Created
September 23, 2020 18:26
-
-
Save matheusd/88f172df827629b189854be1c4570bfa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"sync" | |
) | |
func Highway(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent { | |
outputC := make(chan HydratedEvent, 0) | |
type worker struct { | |
in chan Event | |
out chan HydratedEvent | |
} | |
nextWorker := make(chan *worker, numWorkers) | |
nextResponse := make(chan *worker, numWorkers) | |
quit := make(chan struct{}) | |
var wg sync.WaitGroup | |
for i := 0; i < numWorkers; i++ { | |
w := &worker{ | |
in: make(chan Event, 1), | |
out: make(chan HydratedEvent, 1), | |
} | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
var e Event | |
select { | |
case <-quit: | |
return | |
case e = <-w.in: | |
} | |
he := asyncProcessor(e) | |
w.out <- he | |
} | |
}() | |
nextWorker <- w | |
} | |
go func() { | |
defer close(quit) | |
defer close(nextResponse) | |
for in := range inputC { | |
w := <-nextWorker | |
w.in <- in | |
nextResponse <- w | |
nextWorker <- w | |
} | |
}() | |
go func() { | |
defer func() { | |
wg.Wait() | |
close(outputC) | |
}() | |
for w := range nextResponse { | |
he := <-w.out | |
outputC <- he | |
} | |
}() | |
return outputC | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"sync" | |
) | |
func MyWay(asyncProcessor EventProcessor, numWorkers int, inputC chan Event) chan HydratedEvent { | |
outputC := make(chan HydratedEvent, 0) | |
workers := make([]chan Event, numWorkers) | |
responses := make([]chan HydratedEvent, numWorkers) | |
quit := make(chan struct{}) | |
var wg sync.WaitGroup | |
for i := range workers { | |
workers[i] = make(chan Event, 1) | |
responses[i] = make(chan HydratedEvent, 1) | |
wg.Add(1) | |
go func(w chan Event, r chan HydratedEvent) { | |
defer wg.Done() | |
for { | |
var e Event | |
select { | |
case <-quit: | |
return | |
case e = <-w: | |
} | |
he := asyncProcessor(e) | |
select { | |
case <-quit: | |
return | |
case r <- he: | |
} | |
} | |
}(workers[i], responses[i]) | |
} | |
go func() { | |
defer close(quit) | |
var i int | |
for in := range inputC { | |
workers[i] <- in | |
i = (i + 1) % numWorkers | |
} | |
}() | |
go func() { | |
defer func() { | |
wg.Wait() | |
close(outputC) | |
}() | |
var i int | |
for { | |
var he HydratedEvent | |
select { | |
case he = <-responses[i]: | |
case <-quit: | |
return | |
} | |
select { | |
case outputC <- he: | |
case <-quit: | |
return | |
} | |
i = (i + 1) % numWorkers | |
} | |
}() | |
return outputC | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment