-
-
Save skovtunenko/da852a8794348c298613e48ac48b4d04 to your computer and use it in GitHub Desktop.
Parallel processing with ordered output in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Parallel processing with ordered output in Go | |
(you can use this pattern by importing https://github.com/MarianoGappa/parseq) | |
This example implementation is useful when the following 3 conditions are true: | |
1) the rate of input is higher than the rate of output on the system (i.e. it queues up) | |
2) the processing of input can be parallelised, and overall throughput increases by doing so | |
3) the order of output of the system needs to respect order of input | |
- if 1 is false, KISS! | |
- often, processing can be parallelised, but the processing time is shorter than the | |
overhead of parallelising and maintaining order | |
- if 3 is false, just use a buffered channel for the input that routes to several | |
goroutines, and have each one output to an output channel. | |
The use case that motivated this implementation was: | |
1) a Kafka consumer outputs JSON to a channel that must be unmarshalled to a | |
struct | |
2) order of output matters | |
3) speed is paramount; there's plenty of CPU room for parallelism | |
Without any explicit goroutines, every unmarshal operation blocks the next | |
channel read. | |
Run the example with the 3 default parameters; see how even though it takes 5 | |
seconds to process a request, after the first initial 5 seconds, output happens | |
every second, in order. | |
*/ | |
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
var unresolved []int64 | |
var l sync.Mutex | |
func main() { | |
parallelism := 5 | |
processDuration := 5 * time.Second | |
requestEvery := 1 * time.Second | |
rqs := make(chan request, parallelism) | |
work := make(chan request, parallelism) | |
rts := make(chan int64, parallelism) | |
orts := make(chan int64) | |
go makeRequests(rqs, requestEvery) | |
go readRequests(rqs, work) | |
go orderResults(rts, orts) | |
for i := 0; i < parallelism; i++ { | |
go processRequests(work, rts, processDuration) | |
} | |
for r := range orts { | |
fmt.Print(r, "-") | |
} | |
} | |
func makeRequests(rqs chan request, requestEvery time.Duration) { | |
order := int64(0) | |
for { | |
order++ | |
rqs <- request{order} | |
time.Sleep(requestEvery) | |
} | |
} | |
func readRequests(rqs chan request, work chan request) { | |
for r := range rqs { | |
l.Lock() | |
unresolved = append(unresolved, r.order) | |
l.Unlock() | |
work <- r | |
} | |
} | |
func processRequests(work chan request, rts chan int64, processDuration time.Duration) { | |
for r := range work { | |
rts <- r.process(processDuration) | |
} | |
} | |
func orderResults(rts chan int64, orts chan int64) { | |
rtBuf := make(map[int64]int64) | |
for rt := range rts { | |
rtBuf[rt] = rt | |
loop: | |
if len(unresolved) > 0 { | |
u := unresolved[0] | |
if rtBuf[u] != 0 { | |
l.Lock() | |
unresolved = unresolved[1:] | |
l.Unlock() | |
orts <- rtBuf[u] | |
delete(rtBuf, u) | |
goto loop | |
} | |
} | |
} | |
} | |
type request struct { | |
order int64 | |
} | |
func (r request) process(processDuration time.Duration) int64 { | |
time.Sleep(processDuration) | |
return r.order | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment