Created
February 21, 2023 21:22
-
-
Save pnck/bf88ef3a67b6a23858a6f59e740f7ae1 to your computer and use it in GitHub Desktop.
This gitst shows how complicated writing correct concurrency in go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const maxAlive = 7; | |
let idAcc = 0; | |
const idGenerator = () => { | |
return idAcc++; | |
}; // can be hashed, accumulator idGenerator is for convenience | |
let alives = []; | |
let nonemptyResolver = { resolve: () => {} }; | |
let nonemptyWaiter; | |
const results = []; | |
const resetNonemptyWaiter = () => { | |
if (!!results.length) { | |
return; | |
} | |
nonemptyWaiter = new Promise((resolve) => { | |
nonemptyResolver = { resolve }; | |
}); | |
}; | |
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); | |
async function spawnWorker(fn, params, followedTask) { | |
// REQUIREMENT: should spawn worker as long as there is a free slot (alives.length < maxAlive) | |
// REQUIREMENT: spawner should be blockable if all slots are occupied | |
const worker = { id: idGenerator() }; | |
while (alives.length >= maxAlive) { | |
// console.log("==== full,wait... ===="); | |
const ready = await Promise.any(alives.map((v) => v.task)); | |
} | |
alives.push(worker); | |
worker.task = new Promise(async (resolve, reject) => { | |
try { | |
const holdingResult = await fn(params); | |
alives = alives.filter((p) => p.id !== worker.id); | |
const getResult = async () => { | |
await ( | |
await followedTask | |
)?.result; | |
results.push({ workerId: worker.id, result: holdingResult }); | |
nonemptyResolver.resolve(); | |
}; | |
resolve({ workerId: worker.id, result: getResult() }); | |
} catch (err) { | |
reject(err); | |
} | |
}); | |
// REQUIREMENT: alive queue size should never grow beyond maxAlive | |
console.log( | |
`task id=${worker.id} queued, alives = ${alives.length}, results = ${results.length}` | |
); | |
return worker; | |
} | |
async function getNext() { | |
await nonemptyWaiter; | |
const result = await results.shift(); | |
resetNonemptyWaiter(); | |
if (result.result === "stub") { | |
return await results.shift(); | |
} | |
return result; | |
} | |
//// ---- QUEUE TASKS ---- //// | |
resetNonemptyWaiter(); | |
console.log("- queue first"); // REQUIREMENT: FINFDFO should be the first in result queue | |
let lastWorker = await spawnWorker( | |
async (v) => { | |
await sleep(v[0]); | |
console.log(`FINFDFO done, T= ${v[0]} v = ${v[1]}`); | |
return "FirstInNotFirstDoneFirstOut"; | |
}, | |
[6000, 1], | |
Promise.resolve({ workerId: -1, result: "stub" }) | |
); | |
console.log("+ queued first"); | |
console.log("- queue 5 longs"); // REQUIREMENT: long term tasks should finish before the first | |
for (let i = 0; i < 5; i++) { | |
lastWorker = await spawnWorker( | |
async (v) => { | |
await sleep(v[0]); | |
console.log(`long done, T = ${v[0]}, v = ${v[1]}`); | |
return `long_${v[1]}`; | |
}, | |
[5000 + i * 100, i], | |
lastWorker.task | |
); | |
} | |
console.log("+ queued 5 longs"); | |
console.log("- queue 20 shorts"); | |
// REQUIREMENT: short term tasks done quickly and fully fill the alive queue | |
// REQUIREMENT: all short term tasks should be queued and done successfully while first task is still unfinished | |
for (let i = 0; i < 20; i++) { | |
lastWorker = await spawnWorker( | |
async (v) => { | |
await sleep(v[0]); | |
console.log(`short done, T = ${v[0]}, v = ${v[1]}`); | |
return `short_${v[1]}`; | |
}, | |
[100 + i * 2, i], | |
lastWorker.task | |
); | |
} | |
console.log("+ queued 20 shorts"); | |
const concurrentSet1 = new Promise((resolve) => { | |
// REQUIREMENT: concurrent sets should be able to be queued while getting results | |
setTimeout(async () => { | |
console.log("- queue 50 shorts (concurrent set 1):"); | |
let _lastWorker = lastWorker; | |
for (let i = 0; i < 50; i++) { | |
await sleep(Math.random() * 200); | |
_lastWorker = await spawnWorker( | |
async (v) => { | |
await sleep(v[0]); | |
console.log(`c_short done, T = ${v[0]}, v = ${v[1]}`); | |
return `c_short_v${v[1]}`; | |
}, | |
[400 + i * 10, i], | |
_lastWorker.task | |
); | |
} | |
console.log("+ queued 50 shorts (concurrent set 1):"); | |
resolve(_lastWorker.task); | |
}, 1); | |
}); | |
const concurrentSet2 = new Promise((resolve) => { | |
// REQUIREMENT: concurrent set2 mixes up the finish order with set1 | |
setTimeout(async () => { | |
console.log("- queue 50 randoms (concurrent set 2):"); | |
let _lastWorker = lastWorker; | |
for (let i = 0; i < 50; i++) { | |
await sleep(Math.random() * 200); | |
_lastWorker = await spawnWorker( | |
async (v) => { | |
await sleep(v[0]); | |
console.log(`c_random done, T = ${v[0]}, v = ${v[1]}`); | |
return `c_random_v${v[1]}`; | |
}, | |
[Math.round(Math.random() * 5000), i], // large random range, and it's concurrent with set1! | |
_lastWorker.task | |
); | |
} | |
console.log("+ queued 50 randoms (concurrent set 2):"); | |
resolve(_lastWorker.task); | |
}, 1); | |
}); | |
setTimeout(async () => { | |
const eachLastTasks = Promise.all([concurrentSet1, concurrentSet2]); | |
// REQUIREMENT: LINLDLO should be the last result | |
// REQUIREMENT: LINLDLO should be able to get done before some c_random tasks | |
console.log("- queue LILDLO (concurrent)"); | |
await spawnWorker( | |
async () => { | |
await sleep(100); | |
console.log(`LINLDLO done`); | |
return "LastInNotLastDoneLastOut"; | |
}, | |
null, | |
Promise.all((await eachLastTasks).map((v) => v.result)) | |
); | |
console.log("+ queued LINLDLO"); | |
}, 1); | |
try { | |
while (1) { | |
// REQUIREMENT: each group of tasks should be strictly in the order of their enqueue time | |
// REQUIREMENT: getNext() should be able to get result while concurrently adding new tasks | |
const r = await getNext(); | |
console.log("got:", r); | |
if (r.result === "LastInNotLastDoneLastOut") { | |
break; | |
} | |
await sleep(10); | |
} | |
} catch (err) { | |
console.log(err); | |
} finally { | |
console.log("finished"); | |
} | |
// node mess_queue.js | grep ... | |
// cat mess_queue.js| grep REQUIREMENT | |
// REQUIREMENT: should spawn worker as long as there is a free slot (alives.length < maxAlive) | |
// REQUIREMENT: spawner should be blockable if all slots are occupied | |
// REQUIREMENT: alive queue size should never grow beyond maxAlive | |
// REQUIREMENT: FINFDFO should be the first in result queue | |
// REQUIREMENT: long term tasks should finish before the first | |
// REQUIREMENT: short term tasks done quickly and fully fill the alive queue | |
// REQUIREMENT: all short term tasks should be queued and done successfully while first task is still unfinished | |
// REQUIREMENT: concurrent sets should be able to be queued while getting results | |
// REQUIREMENT: concurrent set2 mixes up the finish order with set1 | |
// REQUIREMENT: LINLDLO should be the last result | |
// REQUIREMENT: LINLDLO should be able to get done before some c_random tasks | |
// REQUIREMENT: each group of tasks should be strictly in the order of their enqueue time | |
// REQUIREMENT: getNext() should be able to get result while concurrently adding new tasks |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"math/rand" | |
"reflect" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
const maxAlive = 7 | |
var idAccumulator atomic.Int32 | |
type resultType = string | |
type asKey string | |
type ResultQueue[T any] struct { | |
ch chan *Task[T] | |
} | |
func (q *ResultQueue[T]) Push(v *Task[T]) { | |
q.ch <- v | |
} | |
func (q *ResultQueue[T]) Pop() *Task[T] { | |
return <-q.ch | |
} | |
func NewResultQueue[T any]() *ResultQueue[T] { | |
return &ResultQueue[T]{ | |
ch: make(chan *Task[T]), | |
} | |
} | |
type SizedBucket[T any] interface { | |
Size() int | |
Cap() int | |
Put(v T) | |
TakeAny() *T | |
TakeBy(cmp func(T) bool) *T | |
} | |
type Task[T any] struct { | |
Id int | |
running atomic.Bool | |
lock sync.Mutex // must lock while setting result | |
fn func() T | |
following *Task[T] | |
finishedCh chan struct{} | |
result *T | |
resultCh <-chan T | |
resultQueue *ResultQueue[T] | |
} | |
func genId() int { | |
return int(idAccumulator.Add(1)) - 1 | |
} | |
func NewTask[T any](fn func() T) *Task[T] { | |
newTask := &Task[T]{ | |
Id: genId(), | |
fn: fn, | |
finishedCh: make(chan struct{}, 1), | |
} | |
newTask.running.Store(false) | |
return newTask | |
} | |
func (t *Task[T]) awaitResult() { | |
t.lock.Lock() | |
defer t.lock.Unlock() | |
if t.result != nil { | |
return | |
} | |
r := <-t.resultCh | |
t.result = &r | |
} | |
func (t *Task[T]) AwaitResult() T { | |
t.awaitResult() | |
return *t.result | |
} | |
func (t *Task[T]) Follow(task *Task[T]) *Task[T] { | |
t.following = task | |
return t | |
} | |
func (t *Task[T]) PushTo(q *ResultQueue[T]) *Task[T] { | |
t.resultQueue = q | |
return t | |
} | |
func (t *Task[T]) Execute() *Task[T] { | |
if t.fn == nil { | |
return t | |
} | |
if t.running.CompareAndSwap(false, true) { | |
resultCh := make(chan T, 1) | |
t.resultCh = resultCh | |
go func() { | |
r := t.fn() | |
t.finishedCh <- struct{}{} | |
close(t.finishedCh) | |
if t.following != nil { | |
t.following.awaitResult() | |
} | |
resultCh <- r | |
close(resultCh) | |
if t.resultQueue != nil { | |
t.resultQueue.Push(t) | |
} | |
}() | |
} | |
return t | |
} | |
type Spawner[T any] struct { | |
maxAlive int | |
slots []*Task[T] | |
fd_set []reflect.SelectCase | |
finishing <-chan struct{} | |
} | |
func NewSpawner[T any](ctx context.Context, maxAlive int) *Spawner[T] { | |
sp := &Spawner[T]{maxAlive: maxAlive, | |
slots: make([]*Task[T], 0, maxAlive), | |
fd_set: make([]reflect.SelectCase, 0, maxAlive+1), | |
finishing: ctx.Done(), | |
} | |
for i := 0; i < maxAlive; i++ { | |
sp.slots = append(sp.slots, nil) | |
tc := make(chan struct{}, 1) | |
tc <- struct{}{} | |
close(tc) | |
sp.fd_set = append(sp.fd_set, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tc)}) | |
} | |
sp.fd_set = append(sp.fd_set, reflect.SelectCase{Dir: reflect.SelectDefault}) | |
return sp | |
} | |
func (sp *Spawner[T]) Spawn(fn func() T) *Task[T] { | |
select { | |
case <-sp.finishing: | |
log.Println("* finishing, will not spawn new task") | |
// construct default empty func | |
return NewTask(*reflect.New(reflect.TypeOf(fn)).Interface().(*func() T)) | |
default: | |
break | |
} | |
chosen, _, _ := reflect.Select(sp.fd_set) | |
if chosen == len(sp.fd_set)-1 { | |
// log.Println("==== FULL, wait... ====") | |
fd_set := sp.fd_set[:len(sp.fd_set)-1] | |
chosen, _, _ = reflect.Select(fd_set) | |
} | |
newTask := NewTask(fn) | |
sp.slots[chosen] = newTask | |
sp.fd_set[chosen] = reflect.SelectCase{ | |
Dir: reflect.SelectRecv, | |
Chan: reflect.ValueOf(newTask.finishedCh), | |
} | |
// log.Printf("taskId = %d created, alives = %d \n", newTask.Id, len(sp.slots)) | |
return newTask | |
} | |
func routineGetResult(ctx context.Context) { | |
results := ctx.Value(asKey("resultQueue")).(*ResultQueue[resultType]) | |
EndEverything := ctx.Value(asKey("EndEverything")).(context.CancelFunc) | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
task := results.Pop() | |
r := task.AwaitResult() | |
if r == "FINISH" { | |
log.Println("* receiving routine got FINISH, gracefully exit with calling EndEverything()") | |
EndEverything() | |
return | |
} | |
log.Printf("got: %v\n", r) | |
} | |
} | |
func routineDoSpawnWorkers(ctx context.Context) { | |
queued := ctx.Value(asKey("queued")).(chan struct{}) | |
resultQueue := ctx.Value(asKey("resultQueue")).(*ResultQueue[resultType]) | |
spawner := NewSpawner[resultType](ctx, maxAlive) | |
____OKLetsRunTests____(spawner, resultQueue) | |
queued <- struct{}{} | |
<-ctx.Done() // ensure EndEverything() called | |
// following attempts should fail | |
for i := 0; i < 5; i++ { | |
spawner.Spawn(func() resultType { | |
// should see "* finishing, will not spawn new task" | |
log.Println("*! Should never see this since EndEverything() has been called") | |
return "! Should never see this since EndEverything() has been called" | |
}).Execute() | |
} | |
} | |
func main() { | |
ctx := context.Background() | |
ctx, EndEverything := context.WithCancel(ctx) | |
ctx = context.WithValue(ctx, asKey("EndEverything"), EndEverything) | |
resultQueue := NewResultQueue[resultType]() | |
ctx = context.WithValue(ctx, asKey("resultQueue"), resultQueue) | |
go routineGetResult(ctx) | |
signalQueued := make(chan struct{}) | |
ctx = context.WithValue(ctx, asKey("queued"), signalQueued) | |
go routineDoSpawnWorkers(ctx) | |
go func() { | |
<-signalQueued | |
log.Println("* all tests queued, waiting for results") | |
// EndEverything() | |
}() | |
<-ctx.Done() | |
log.Println("* main routine end waiting") | |
time.Sleep(time.Millisecond) | |
} | |
// ------------------------------------------- // | |
func ____OKLetsRunTests____(sp *Spawner[resultType], resultQueue *ResultQueue[resultType]) { | |
log.Println("okLetsRunTests") | |
log.Println("- queue FINFDFO (FirstInNotFirstDoneFirstOut)") | |
last := sp.Spawn(func() resultType { | |
log.Println("RUN FINFDFO") | |
time.Sleep(time.Second * 6) | |
log.Printf("FINFDFO done, values = %+v\n", []int{1, 2, 3}) | |
return "FINFDFO" | |
}) | |
log.Println("+ queued FINFDFO") | |
last.PushTo(resultQueue).Execute() | |
log.Println("- queue 5 long terms") | |
for i := 0; i < 5; i++ { | |
_last := last | |
t := time.Second*5 + time.Millisecond*time.Duration(10*i) | |
v := i | |
last = sp.Spawn(func() resultType { | |
time.Sleep(t) | |
log.Printf("long term task done, T = %v, value = %v\n", t, v) | |
return fmt.Sprintf("long_v%d", v) | |
}) | |
last.Follow(_last).PushTo(resultQueue).Execute() | |
} | |
log.Println("+ queued 5 long terms") | |
log.Println("- queue 200 short terms") | |
for i := 0; i < 200; i++ { | |
_last := last | |
t := time.Millisecond*100 + time.Millisecond*time.Duration(rand.Intn(120)-60) | |
v := i | |
last = sp.Spawn(func() resultType { | |
time.Sleep(t) | |
log.Printf("short term task done, T = %v, value = %v\n", t, v) | |
return fmt.Sprintf("short_v%d", v) | |
}) | |
last.Follow(_last).PushTo(resultQueue).Execute() | |
} | |
log.Println("+ queued 200 short terms") | |
log.Println("- queue FINISH task, routines should end gracefully") | |
final := sp.Spawn(func() resultType { | |
return "FINISH" | |
}).Follow(last).PushTo(resultQueue).Execute() | |
log.Println("+ queued FINISH task") | |
_ = final.AwaitResult() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment