Skip to content

Instantly share code, notes, and snippets.

@Loupax
Forked from valsteen/queue_processing.go
Last active September 15, 2022 10:46
Show Gist options
  • Save Loupax/e69931545cacbdb2c4de918c6b9bf1ed to your computer and use it in GitHub Desktop.
Save Loupax/e69931545cacbdb2c4de918c6b9bf1ed to your computer and use it in GitHub Desktop.
Queue processing
/*
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue.
Program input: a nested list of items that can either be integers, or nested slices:
- when it's an integer, wait for 25 - value seconds
- when it's a slice, wait for len(slice) seconds, then put back each item in the queue to be processed next
waiting time simulates some processing, on which a concurrency limit is applied ( both waiting times share the same
limit ). pushing back to the queue should not be blocking.
The following tree is used to validate the solution:
[
1,
2,
[3, [4, 5, [6, 7]], 8],
[9, 10],
11,
12,
13,
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]]
]
Expected timings below, that I consistently obtain on 3 different implementations
|-----------------|-----------------------------------|
| max concurrency | approximate total time in seconds |
|-----------------|-----------------------------------|
| 10 | 38 |
| 5 | 65 |
| 1 | 319 |
| unlimited | 27 |
|-----------------|-----------------------------------|
Rust version: https://gist.github.com/valsteen/103aac191afa881d88829bb9e3699784
Python version: https://gist.github.com/valsteen/6989796b49be4dc102fed2fb08c05cf3
*/
package main
import (
"fmt"
"sync"
"time"
)
var taskTree = []any{
1,
2,
[]any{3, []any{4, 5, []any{6, 7}}, 8},
[]any{9, 10},
11,
12,
13,
[]any{14, 15, []any{16, []any{17, 18, 19, []any{20, 21, 22}}}},
}
const MaxConcurrency = 10
func orchestrate(input []any, itemQueue chan int, groupQueue chan []any, wg *sync.WaitGroup) {
for _, v := range input {
if num, ok := v.(int); ok {
itemQueue <- num
}
if group, ok := v.([]any); ok {
wg.Add(len(group))
groupQueue <- group
}
// Let's ignore invalid inputs for now
}
}
func main() {
itemQueue := make(chan int, MaxConcurrency)
groupQueue := make(chan []any, MaxConcurrency)
concurrency := make(chan bool, MaxConcurrency)
var wg sync.WaitGroup
orchestrate(taskTree, itemQueue, groupQueue, &wg)
go func() {
for {
concurrency <- true
select {
case num := <-itemQueue:
go func(i int) {
defer func() { <-concurrency }()
fmt.Printf(">> processing final value %d ...\n", i)
time.Sleep(time.Duration(25-i) * time.Second)
fmt.Printf("<< finished processing final value %d ...\n", i)
wg.Done()
}(num)
case grp := <-groupQueue:
go func(g []any) {
defer func() { <-concurrency }()
size := len(g)
fmt.Printf("@@>> got list of {%d}, rescheduling them\n", size)
time.Sleep(time.Duration(size) * time.Second)
fmt.Printf("@@<< finished processing list of {%d}\n", size)
go orchestrate(g, itemQueue, groupQueue, &wg)
}(grp)
}
}
}()
// The wait group will wait until all ints are processed
// There is no need to wait for the channels to close
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment