Created
October 15, 2022 20:47
-
-
Save valsteen/28c0e870f6f5639a902bbf5e4a5273b2 to your computer and use it in GitHub Desktop.
Queue processing, using errgroup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This demonstrates a task scheduling loop where tasks can send back other tasks to the queue. | |
Program input: a nested list of items that can either be integers, or nested slices: | |
- when it's an integer, wait for 25 - value seconds | |
- when it's a slice, wait for len(slice) seconds, then put back each item in the queue to be processed next | |
waiting time simulates some processing, on which a concurrency limit is applied ( both waiting times share the same | |
limit ). pushing back to the queue should not be blocking. | |
The following tree is used to validate the solution: | |
[ | |
1, | |
2, | |
[3, [4, 5, [6, 7]], 8], | |
[9, 10], | |
11, | |
12, | |
13, | |
[14, 15, [16, [17, 18, 19, [20, 21, 22]]]] | |
] | |
Expected timings below, that I consistently obtain on 3 different implementations | |
|-----------------|-----------------------------------| | |
| max concurrency | approximate total time in seconds | | |
|-----------------|-----------------------------------| | |
| 10 | 41 * | | |
| 5 | 65 | | |
| 1 | 319 | | |
| unlimited | 27 | | |
|-----------------|-----------------------------------| | |
* it's not possible to consistently get the same result as other implementations, errgroup uses a channel as | |
semaphore, and one random waiting consumer will start once another is finished, | |
while other implementations consume tasks in order. | |
Rust version: https://gist.github.com/valsteen/103aac191afa881d88829bb9e3699784 | |
Python version: https://gist.github.com/valsteen/6989796b49be4dc102fed2fb08c05cf3 | |
*/ | |
package main | |
import ( | |
"fmt" | |
"runtime" | |
"sync" | |
"time" | |
"golang.org/x/sync/errgroup" | |
) | |
var taskTree = []any{ | |
1, | |
2, | |
[]any{3, []any{4, 5, []any{6, 7}}, 8}, | |
[]any{9, 10}, | |
11, | |
12, | |
13, | |
[]any{14, 15, []any{16, []any{17, 18, 19, []any{20, 21, 22}}}}, | |
} | |
// MaxConcurrency -1 = unlimited | |
const MaxConcurrency = 10 | |
func process(value any) any { | |
switch actual := value.(type) { | |
case int: | |
fmt.Printf(">> processing final value %d ...\n", actual) | |
time.Sleep(time.Duration(25-actual) * time.Second) | |
fmt.Printf("<< finished processing final value %d ...\n", actual) | |
return nil | |
case []any: | |
fmt.Printf("@@>> got list of %d, rescheduling them\n", len(actual)) | |
time.Sleep(time.Duration(len(actual)) * time.Second) | |
fmt.Printf("@@<< finished processing list of %d\n", len(actual)) | |
return actual | |
default: | |
panic("invalid input") | |
} | |
} | |
func work(group *errgroup.Group, value any, tasks *sync.WaitGroup) func() error { | |
return func() error { | |
result := process(value) | |
if values, ok := result.([]any); ok { | |
tasks.Add(len(values)) | |
schedule(group, values, tasks) | |
} | |
tasks.Done() | |
return nil | |
} | |
} | |
func schedule(group *errgroup.Group, values []any, tasks *sync.WaitGroup) { | |
for _, value := range values { | |
// as per problem statement, the concurrency limit does not apply to putting new tasks in the queue | |
go func(value any) { | |
group.Go(work(group, value, tasks)) | |
}(value) | |
} | |
} | |
func main() { | |
runtime.GOMAXPROCS(1) | |
input := make(chan any, 1000) // no unbounded channel in go. | |
start := time.Now() | |
group := errgroup.Group{} | |
group.SetLimit(MaxConcurrency) | |
tasks := sync.WaitGroup{} | |
tasks.Add(len(taskTree)) | |
schedule(&group, taskTree, &tasks) | |
tasks.Wait() | |
close(input) | |
elapsed := time.Now().Sub(start) | |
fmt.Printf("%0.2f", elapsed.Seconds()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment