Created
April 19, 2012 14:13
-
-
Save angeldm/2421216 to your computer and use it in GitHub Desktop.
Rob Pike balancer for go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"container/heap" | |
"fmt" | |
"math/rand" | |
"time" | |
) | |
const nRequester = 100 | |
const nWorker = 10 | |
// Simulation of some work: just sleep for a while and report how long. | |
func op() int { | |
n := rand.Int63n(int64(time.Second)) | |
time.Sleep(time.Duration(nWorker * n)) | |
return int(n) | |
} | |
type Request struct { | |
fn func() int | |
c chan int | |
} | |
func requester(work chan Request) { | |
c := make(chan int) | |
for { | |
time.Sleep(time.Duration(rand.Int63n(int64(nWorker * 2 * time.Second)))) | |
work <- Request{op, c} | |
<-c | |
} | |
} | |
type Worker struct { | |
i int | |
requests chan Request | |
pending int | |
} | |
func (w *Worker) work(done chan *Worker) { | |
for { | |
req := <-w.requests | |
req.c <- req.fn() | |
done <- w | |
} | |
} | |
type Pool []*Worker | |
func (p Pool) Len() int { return len(p) } | |
func (p Pool) Less(i, j int) bool { | |
return p[i].pending < p[j].pending | |
} | |
func (p *Pool) Swap(i, j int) { | |
a := *p | |
a[i], a[j] = a[j], a[i] | |
a[i].i = i | |
a[j].i = j | |
} | |
func (p *Pool) Push(x interface{}) { | |
a := *p | |
n := len(a) | |
a = a[0 : n+1] | |
w := x.(*Worker) | |
a[n] = w | |
w.i = n | |
*p = a | |
} | |
func (p *Pool) Pop() interface{} { | |
a := *p | |
*p = a[0 : len(a)-1] | |
w := a[len(a)-1] | |
w.i = -1 // for safety | |
return w | |
} | |
type Balancer struct { | |
pool Pool | |
done chan *Worker | |
i int | |
} | |
func NewBalancer() *Balancer { | |
done := make(chan *Worker, nWorker) | |
b := &Balancer{make(Pool, 0, nWorker), done, 0} | |
for i := 0; i < nWorker; i++ { | |
w := &Worker{requests: make(chan Request, nRequester)} | |
heap.Push(&b.pool, w) | |
go w.work(b.done) | |
} | |
return b | |
} | |
func (b *Balancer) balance(work chan Request) { | |
for { | |
select { | |
case req := <-work: | |
b.dispatch(req) | |
case w := <-b.done: | |
b.completed(w) | |
} | |
b.print() | |
} | |
} | |
func (b *Balancer) print() { | |
sum := 0 | |
sumsq := 0 | |
for _, w := range b.pool { | |
fmt.Printf("%d ", w.pending) | |
sum += w.pending | |
sumsq += w.pending * w.pending | |
} | |
avg := float64(sum) / float64(len(b.pool)) | |
variance := float64(sumsq)/float64(len(b.pool)) - avg*avg | |
fmt.Printf(" %.2f %.2f\n", avg, variance) | |
} | |
func (b *Balancer) dispatch(req Request) { | |
if false { | |
w := b.pool[b.i] | |
w.requests <- req | |
w.pending++ | |
b.i++ | |
if b.i >= len(b.pool) { | |
b.i = 0 | |
} | |
return | |
} | |
w := heap.Pop(&b.pool).(*Worker) | |
w.requests <- req | |
w.pending++ | |
// fmt.Printf("started %p; now %d\n", w, w.pending) | |
heap.Push(&b.pool, w) | |
} | |
func (b *Balancer) completed(w *Worker) { | |
if false { | |
w.pending-- | |
return | |
} | |
w.pending-- | |
// fmt.Printf("finished %p; now %d\n", w, w.pending) | |
heap.Remove(&b.pool, w.i) | |
heap.Push(&b.pool, w) | |
} | |
func main() { | |
work := make(chan Request) | |
for i := 0; i < nRequester; i++ { | |
go requester(work) | |
} | |
NewBalancer().balance(work) | |
} |
what is the usage of if false codes in line: 124 & 143 ?
what is the usage of if false codes in line: 124 & 143 ?
Seems like they're for round-robin implementation instead of least-pending using heap implementation.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What is the significance of this line?