Created
March 21, 2021 14:30
-
-
Save chenlujjj/f2cc6b75e5276e41bf82b5d561fcf28f to your computer and use it in GitHub Desktop.
example code from "Concurrency is not Parallelism" talk by Rob Pike
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"container/heap" | |
"fmt" | |
"math/rand" | |
"time" | |
) | |
const nRequester = 100 | |
const nWorker = 10 | |
// Simulation of some work: just sleep for a while and report how long. | |
func op() int { | |
n := rand.Int63n(int64(time.Second)) | |
time.Sleep(time.Duration(nWorker * n)) | |
return int(n) | |
} | |
type Request struct { | |
fn func() int // The operation to perform. | |
c chan int // The channel to return the result. | |
} | |
func requester(work chan Request) { | |
c := make(chan int) | |
for { | |
time.Sleep(time.Duration(rand.Int63n(int64(nWorker * 2 * time.Second)))) | |
work <- Request{op, c} | |
// c 的作用是让Request模拟同步请求(即响应返回后再发出下一次请求) | |
<-c | |
} | |
} | |
type Worker struct { | |
i int // index in the heap | |
requests chan Request // work to do (buffered channel) | |
pending int // count of pending tasks | |
} | |
func (w *Worker) work(done chan *Worker) { | |
for { | |
req := <-w.requests | |
req.c <- req.fn() | |
done <- w | |
} | |
} | |
// Pool implements heap.Interface. | |
type Pool []*Worker | |
func (p Pool) Len() int { return len(p) } | |
// 优先分配给pending数量少的Worker | |
func (p Pool) Less(i, j int) bool { | |
return p[i].pending < p[j].pending | |
} | |
func (p *Pool) Swap(i, j int) { | |
a := *p | |
a[i], a[j] = a[j], a[i] | |
a[i].i = i | |
a[j].i = j | |
} | |
func (p *Pool) Push(x interface{}) { | |
a := *p | |
n := len(a) | |
a = a[0 : n+1] | |
w := x.(*Worker) | |
a[n] = w | |
w.i = n | |
*p = a | |
} | |
func (p *Pool) Pop() interface{} { | |
a := *p | |
*p = a[0 : len(a)-1] | |
w := a[len(a)-1] | |
w.i = -1 // for safety | |
return w | |
} | |
type Balancer struct { | |
pool Pool | |
// done 用来控制最多有多少个worker同时工作,即done的buffer size | |
done chan *Worker | |
// i 表示在round-robin模式下,轮到的worker的index | |
i int | |
// balancer works in round-robin mode | |
// 分析和实验都可得出,在round-robin工作模式下,pool中worker.pending的标准差要大于按照最小负载分配的工作模式 | |
rr bool | |
} | |
func NewBalancer() *Balancer { | |
done := make(chan *Worker, nWorker) | |
b := &Balancer{make(Pool, 0, nWorker), done, 0, false} | |
for i := 0; i < nWorker; i++ { | |
w := &Worker{requests: make(chan Request, nRequester)} | |
heap.Push(&b.pool, w) | |
go w.work(b.done) | |
} | |
return b | |
} | |
func (b *Balancer) balance(work chan Request) { | |
for { | |
select { | |
case req := <-work: | |
b.dispatch(req) | |
case w := <-b.done: | |
b.completed(w) | |
} | |
b.print() | |
} | |
} | |
// 打印出pool内Worker的pending数量,以及其平均值和标准差 | |
func (b *Balancer) print() { | |
sum := 0 | |
sumsq := 0 | |
for _, w := range b.pool { | |
fmt.Printf("%d ", w.pending) | |
sum += w.pending | |
sumsq += w.pending * w.pending | |
} | |
avg := float64(sum) / float64(len(b.pool)) | |
// 计算标准差 | |
variance := float64(sumsq)/float64(len(b.pool)) - avg*avg | |
fmt.Printf(" %.2f %.2f\n", avg, variance) | |
} | |
func (b *Balancer) dispatch(req Request) { | |
// if语句块内的相当于是 round robin 形式的load balance | |
if b.rr { | |
w := b.pool[b.i] | |
w.requests <- req | |
w.pending++ | |
b.i++ | |
if b.i >= len(b.pool) { | |
b.i = 0 | |
} | |
return | |
} | |
// Grab the least loaded worker | |
w := heap.Pop(&b.pool).(*Worker) | |
w.requests <- req | |
w.pending++ | |
// fmt.Printf("started %p; now %d\n", w, w.pending) | |
heap.Push(&b.pool, w) | |
} | |
func (b *Balancer) completed(w *Worker) { | |
if b.rr { | |
w.pending-- | |
return | |
} | |
w.pending-- | |
// fmt.Printf("finished %p; now %d\n", w, w.pending) | |
heap.Remove(&b.pool, w.i) | |
heap.Push(&b.pool, w) | |
} | |
func main() { | |
work := make(chan Request) | |
for i := 0; i < nRequester; i++ { | |
go requester(work) | |
} | |
NewBalancer().balance(work) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment