Created
May 2, 2014 09:51
-
-
Save stengaard/a06750896617782db80e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// An example of manual GC work in Go. Disconnects it's | |
// receiver before GC to avoid outstanding requests. | |
// | |
// This implementation runs GC every 10 seconds - which is | |
// slightly idiotic. I bet you could come up with a better | |
// strategy. | |
// | |
// This is meant to illustrate how to sidestep the Go GC in | |
// situations where the GC pause bites you in the in ass. | |
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"runtime" | |
"runtime/debug" | |
"time" | |
) | |
// receiver handles incoming request | |
// and is able to coordinate with the gc to | |
// not have any outstanding requests during gc | |
// | |
// new requests come in through in. interface is used a | |
// placeholder. these are the ones we are trying to pause. | |
// | |
// sandman signals that a pause is eminent. also used | |
// to signal back that we have made our preparations | |
// and that we are tristated | |
func receiver(in chan interface{}, sandman chan chan struct{}) { | |
reqC := in | |
// outstanding requests | |
q := map[*request]struct{}{} | |
// shared reply channel | |
replyC := make(chan *reply, 0) | |
var gcDone chan struct{} | |
for { | |
select { | |
case reqData := <-reqC: | |
r := &request{ | |
reqData: reqData, | |
replyC: replyC, | |
} | |
go r.handle() | |
q[r] = struct{}{} | |
case reply := <-replyC: | |
delete(q, reply.req) | |
fmt.Printf("looked up %s (queuelen: %d)\n", reply.req.reqData, len(q)) | |
// no more running requests and awaiting gc | |
// we should signal the pause requestor | |
if len(q) == 0 && gcDone != nil { | |
sandman <- make(chan struct{}, 0) | |
} | |
case gcDone = <-sandman: | |
fmt.Println("preparing to pause") | |
reqC = nil // disable input - now wait for q to be empty | |
// in an amqp setting you should disable flow instead. | |
case <-gcDone: // gc does send here before gc has been done | |
reqC = in // start accepting requests again | |
gcDone = nil | |
} | |
} | |
} | |
// run the gc manager | |
// GC every 10 seconds. a proper implementation could do | |
// something clever with runtime.MemStats or some such to | |
// determine when to GC. | |
// | |
// for a proper hi-mem instance gc should not be done until | |
// we have used a few GB of memory | |
// | |
// p is used to signal whenever a gc is necessary. | |
// protocol is: a signalling channel is sent on | |
// p, when clients are ready they send back something | |
// on p. Next gc is done. Finally we signal on the | |
// original signalling channel to let clients start | |
// work again. | |
func gcManager(p chan chan struct{}) { | |
tick := time.Tick(time.Second) | |
i := 1 | |
for { | |
select { | |
case <-tick: | |
i++ | |
if i%10 == 0 { | |
done := make(chan struct{}, 0) | |
// signal receiver | |
p <- done | |
// wait for ready signal | |
<-p | |
// enable gc | |
debug.SetGCPercent(100) | |
fmt.Println("GC") | |
// gc | |
runtime.GC() | |
// disable gc | |
debug.SetGCPercent(-1) | |
// signal restart to receiver | |
done <- struct{}{} | |
} | |
} | |
} | |
} | |
type request struct { | |
replyC chan *reply | |
reqData interface{} | |
} | |
// simulation of were the actual work happens. | |
func (r *request) handle() { | |
// arbitrary time to do $work | |
time.Sleep(400 * time.Millisecond) | |
// and an arbitrary response | |
r.reply("done") | |
} | |
// reply to requst with replyData | |
func (r *request) reply(replyData interface{}) { | |
r.replyC <- &reply{ | |
req: r, | |
payload: replyData, | |
} | |
} | |
type reply struct { | |
req *request | |
payload interface{} | |
} | |
func main() { | |
// disable GC | |
debug.SetGCPercent(-1) | |
// request flow channel | |
in := make(chan interface{}, 0) | |
// gc event channel | |
p := make(chan chan struct{}, 0) | |
go gcManager(p) | |
// litter so the gc has something to do | |
go malmskov() | |
// request generator | |
names := []string{"alpha", "delta", "theta", "kappa", "sigma"} | |
for _, name := range names { | |
go reqGenerator(name, in) | |
} | |
// run request handler | |
receiver(in, p) | |
} | |
// generates incoming requests | |
func reqGenerator(id string, out chan interface{}) { | |
for i := 0; true; i++ { | |
// slightly faster than handle time - so we are sure to have | |
// a queue build up in receiver | |
w := time.Duration(100+rand.Int31n(200)) * time.Millisecond | |
<-time.After(w) | |
out <- fmt.Sprintf("%s-%d", id, i) | |
} | |
} | |
// makes garbage - so MemStats gets more interesting | |
func malmskov() { | |
tick := time.Tick(250 * time.Millisecond) | |
for i := 1; true; i++ { | |
t := <-tick | |
buf := make([]byte, 256*1024) | |
nop(buf) | |
if i%20 == 0 { | |
s := &runtime.MemStats{} | |
runtime.ReadMemStats(s) | |
fmt.Printf("%s (%d)\n", t, i) | |
printMemStats(s) | |
} | |
} | |
} | |
func nop(_ []byte) {} | |
func printMemStats(m *runtime.MemStats) { | |
t := []struct { | |
name string | |
val uint64 | |
}{ | |
{"LastGC", m.LastGC}, | |
{"NextGC", m.NextGC}, | |
{"PauseNs", m.PauseNs[(m.NumGC+255)%256]}, | |
{"Allocs", m.Alloc}, | |
{"Mallocs", m.Mallocs}, | |
{"NumGcs", uint64(m.NumGC)}, | |
} | |
for _, e := range t { | |
fmt.Printf("%-9s: %d\n", e.name, e.val) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment