-
-
Save Loupax/e69931545cacbdb2c4de918c6b9bf1ed to your computer and use it in GitHub Desktop.
Queue processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue. | |
Program input: a nested list of items that can either be integers, or nested slices: | |
- when it's an integer, wait for 25 - value seconds | |
- when it's a slice, wait for len(slice) seconds, then put back each item in the queue to be processed next | |
waiting time simulates some processing, on which a concurrency limit is applied ( both waiting times share the same | |
limit ). pushing back to the queue should not be blocking. | |
The following tree is used to validate the solution: | |
[ | |
1, | |
2, | |
[3, [4, 5, [6, 7]], 8], | |
[9, 10], | |
11, | |
12, | |
13, | |
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]] | |
] | |
Expected timings below, that I consistently obtain on 3 different implementations | |
|-----------------|-----------------------------------| | |
| max concurrency | approximate total time in seconds | | |
|-----------------|-----------------------------------| | |
| 10 | 38 | | |
| 5 | 65 | | |
| 1 | 319 | | |
| unlimited | 27 | | |
|-----------------|-----------------------------------| | |
Rust version: https://gist.github.com/valsteen/103aac191afa881d88829bb9e3699784 | |
Python version: https://gist.github.com/valsteen/6989796b49be4dc102fed2fb08c05cf3 | |
*/ | |
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
var taskTree = []any{ | |
1, | |
2, | |
[]any{3, []any{4, 5, []any{6, 7}}, 8}, | |
[]any{9, 10}, | |
11, | |
12, | |
13, | |
[]any{14, 15, []any{16, []any{17, 18, 19, []any{20, 21, 22}}}}, | |
} | |
const MaxConcurrency = 10 | |
func orchestrate(input []any, itemQueue chan int, groupQueue chan []any, wg *sync.WaitGroup) { | |
for _, v := range input { | |
if num, ok := v.(int); ok { | |
itemQueue <- num | |
} | |
if group, ok := v.([]any); ok { | |
wg.Add(len(group)) | |
groupQueue <- group | |
} | |
// Let's ignore invalid inputs for now | |
} | |
} | |
func main() { | |
itemQueue := make(chan int, MaxConcurrency) | |
groupQueue := make(chan []any, MaxConcurrency) | |
concurrency := make(chan bool, MaxConcurrency) | |
var wg sync.WaitGroup | |
orchestrate(taskTree, itemQueue, groupQueue, &wg) | |
go func() { | |
for { | |
concurrency <- true | |
select { | |
case num := <-itemQueue: | |
go func(i int) { | |
defer func() { <-concurrency }() | |
fmt.Printf(">> processing final value %d ...\n", i) | |
time.Sleep(time.Duration(25-i) * time.Second) | |
fmt.Printf("<< finished processing final value %d ...\n", i) | |
wg.Done() | |
}(num) | |
case grp := <-groupQueue: | |
go func(g []any) { | |
defer func() { <-concurrency }() | |
size := len(g) | |
fmt.Printf("@@>> got list of {%d}, rescheduling them\n", size) | |
time.Sleep(time.Duration(size) * time.Second) | |
fmt.Printf("@@<< finished processing list of {%d}\n", size) | |
go orchestrate(g, itemQueue, groupQueue, &wg) | |
}(grp) | |
} | |
} | |
}() | |
// The wait group will wait until all ints are processed | |
// There is no need to wait for the channels to close | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment