Last active
March 20, 2023 02:47
-
-
Save valsteen/38e82d7ee5fc5d03822464948f0e46b3 to your computer and use it in GitHub Desktop.
Self-feeding processing queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue. | |
Program input: a nested list of items that can either be integers, or nested slices: | |
- when it's an integer, wait for 25 - value seconds | |
- when it's a slice, wait for len(slice) seconds, then put back each item in the queue to be processed next | |
waiting time simulates some processing, on which a concurrency limit is applied ( both waiting times share the same | |
limit ). pushing back to the queue should not be blocking. | |
The following tree is used to validate the solution: | |
[ | |
1, | |
2, | |
[3, [4, 5, [6, 7]], 8], | |
[9, 10], | |
11, | |
12, | |
13, | |
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]] | |
] | |
Expected timings below, that I consistently obtain on 3 different implementations | |
|-----------------|-----------------------------------| | |
| max concurrency | approximate total time in seconds | | |
|-----------------|-----------------------------------| | |
| 10 | 38 * | | |
| 5 | 65 | | |
| 1 | 319 | | |
| unlimited | 27 | | |
|-----------------|-----------------------------------| | |
* not always stable, it can also give 42 ( yes, 42 ). This is probably because new items are added to the queue | |
from two goroutines at the same moment ; the items may interleave in different ways. | |
Rust version: https://gist.github.com/valsteen/103aac191afa881d88829bb9e3699784 | |
Python version: https://gist.github.com/valsteen/6989796b49be4dc102fed2fb08c05cf3 | |
*/ | |
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
var taskTree = []any{ | |
1, | |
2, | |
[]any{3, []any{4, 5, []any{6, 7}}, 8}, | |
[]any{9, 10}, | |
11, | |
12, | |
13, | |
[]any{14, 15, []any{16, []any{17, 18, 19, []any{20, 21, 22}}}}, | |
} | |
// MaxConcurrency 0 = unlimited | |
const MaxConcurrency = 10 | |
func process(value any) any { | |
switch actual := value.(type) { | |
case int: | |
fmt.Printf(">> processing final value %d ...\n", actual) | |
time.Sleep(time.Duration(25-actual) * time.Second) | |
fmt.Printf("<< finished processing final value %d ...\n", actual) | |
return nil | |
case []any: | |
fmt.Printf("@@>> got list of %d, rescheduling them\n", len(actual)) | |
time.Sleep(time.Duration(len(actual)) * time.Second) | |
fmt.Printf("@@<< finished processing list of %d\n", len(actual)) | |
return actual | |
default: | |
panic("invalid input") | |
} | |
} | |
func work(input chan any, value any, tasks *sync.WaitGroup) { | |
result := process(value) | |
if values, ok := result.([]any); ok { | |
tasks.Add(len(values)) | |
for _, newValue := range values { | |
input <- newValue | |
} | |
} | |
tasks.Done() | |
} | |
func worker(input chan any, tasks *sync.WaitGroup) { | |
for value := range input { | |
work(input, value, tasks) | |
} | |
} | |
func unlimitedWorker(input chan any, tasks *sync.WaitGroup) { | |
for value := range input { | |
go work(input, value, tasks) | |
} | |
} | |
func main() { | |
input := make(chan any, 100) // no unbounded channel in go. | |
start := time.Now() | |
tasks := sync.WaitGroup{} | |
if MaxConcurrency > 0 { | |
for i := 0; i < MaxConcurrency; i++ { | |
go worker(input, &tasks) | |
} | |
} else { | |
go unlimitedWorker(input, &tasks) | |
} | |
tasks.Add(len(taskTree)) | |
for _, value := range taskTree { | |
input <- value | |
} | |
tasks.Wait() | |
close(input) | |
elapsed := time.Now().Sub(start) | |
fmt.Printf("%0.2f", elapsed.Seconds()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment