Skip to content

Instantly share code, notes, and snippets.

@dvirsky
Forked from jonhoo/README.md
Last active August 29, 2015 14:20
Show Gist options
  • Save dvirsky/bb7493c8f03cd9c1c37e to your computer and use it in GitHub Desktop.
Save dvirsky/bb7493c8f03cd9c1c37e to your computer and use it in GitHub Desktop.

Distributed Read-Write Mutex in Go

The default Go implementation of sync.RWMutex does not scale well to multiple cores, as all readers contend on the same memory location when they all try to atomically increment it. This gist explores an n-way RWMutex, also known as a "big reader" lock, which gives each CPU core its own RWMutex. Readers take only a read lock local to their core, whereas writers must take all locks in order.

Finding the current CPU

To determine which lock to take, readers use the CPUID instruction, which gives the APICID of the currently active CPU without having to issue a system call or modify the runtime. This instruction is supported on both Intel and AMD processors; ARM CPUs should use the CPU ID register instead. For systems with more than 256 processors, x2APIC must be used, and the EDX register after CPUID with EAX=0xb should be used instead. A mapping from APICID to CPU index is constructed (using CPU affinity syscalls) when the program is started, as it is static for the lifetime of a process. Since the CPUID instruction can be fairly expensive, goroutines will also only periodically update their estimate of what core they are running on. More frequent updates lead to less inter-core lock traffic, but also increases the time spent on CPUID instructions relative to the actual locking.

Stale CPU information. The information of which CPU a goroutine is running on might be stale when we take the lock (the goroutine could have been moved to another core), but this will only affect performance, not correctness, as long as the reader remembers which lock it took. Such moves are also unlikely, as the OS kernel tries to keep threads on the same core to improve cache hits.

Performance

There are many parameters that affect the performance characteristics of this scheme. In particular, the frequency of CPUID checking, the number of readers, the ratio of readers to writers, and the time readers hold their locks, are all important. Since only a single writer is active at the time, the duration a writer holds a lock for does not affect the difference in performance between sync.RWMutex and DRWMutex.

Experiments show that DRWMutex performs better the more cores the system has, and in particular when the fraction of writers is <1%, and CPUID is called at most every 10 locks (this changes depending on the duration a lock is held for). Even on few cores, DRWMutex outperforms sync.RWMutex under these conditions, which are common for applications that elect to use sync.RWMutex over sync.Mutex.

The plot below shows mean performance across 10 runs as the number of cores increases using:

drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

DRWMutex and sync.RWMutex performance comparison

Error bars denote 25th and 75th percentile. Note the drops every 10th core; this is because 10 cores constitute a NUMA node on the machine the benchmarks were run on, so once a NUMA node is added, cross-core traffic becomes more expensive. Performance increases for DRWMutex as more readers can work in parallel compared to sync.RWMutex.

See the go-nuts thread for further discussion.

#include "textflag.h"
// func cpu() uint64
TEXT ·cpu(SB),NOSPLIT,$0-8
MOVL $0x01, AX // version information
MOVL $0x00, BX // any leaf will do
MOVL $0x00, CX // any subleaf will do
// call CPUID
BYTE $0x0f
BYTE $0xa2
SHRQ $24, BX // logical cpu id is put in EBX[31-24]
MOVQ BX, ret+0(FP)
RET
package main
import (
"flag"
"fmt"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"sync"
"syscall"
"time"
"unsafe"
)
func cpu() uint64 // implemented in cpu_amd64.s
var cpus map[uint64]int
// determine mapping from APIC ID to CPU index by pinning the entire process to
// one core at the time, and seeing that its APIC ID is.
func init() {
cpus = make(map[uint64]int)
var aff uint64
syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
n := 0
start := time.Now()
var mask uint64 = 1
Outer:
for {
for (aff & mask) == 0 {
mask <<= 1
if mask == 0 || mask > aff {
break Outer
}
}
ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
if ret != 0 {
panic(err.Error())
}
// what CPU do we have?
<-time.After(1 * time.Millisecond)
c := cpu()
if oldn, ok := cpus[c]; ok {
fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)
}
cpus[c] = n
mask <<= 1
n++
}
fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)
ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
if ret != 0 {
panic(err.Error())
}
}
type RWMutex2 []sync.RWMutex
func (mx RWMutex2) Lock() {
for core := range mx {
mx[core].Lock()
}
}
func (mx RWMutex2) Unlock() {
for core := range mx {
mx[core].Unlock()
}
}
func main() {
cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling")
locks := flag.Uint64("i", 10000, "Number of iterations to perform")
write := flag.Float64("p", 0.0001, "Probability of write locks")
wwork := flag.Int("w", 1, "Amount of work for each writer")
rwork := flag.Int("r", 100, "Amount of work for each reader")
readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")
checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")
flag.Parse()
var o *os.File
if *cpuprofile {
o, _ := os.Create("rw.out")
pprof.StartCPUProfile(o)
}
readers_per_core := *readers / runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
var mx1 sync.RWMutex
start1 := time.Now()
for n := 0; n < runtime.GOMAXPROCS(0); n++ {
for r := 0; r < readers_per_core; r++ {
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(rand.Int63()))
for n := uint64(0); n < *locks; n++ {
if r.Float64() < *write {
mx1.Lock()
x := 0
for i := 0; i < *wwork; i++ {
x++
}
_ = x
mx1.Unlock()
} else {
mx1.RLock()
x := 0
for i := 0; i < *rwork; i++ {
x++
}
_ = x
mx1.RUnlock()
}
}
}()
}
}
wg.Wait()
end1 := time.Now()
t1 := end1.Sub(start1)
fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)
if *cpuprofile {
pprof.StopCPUProfile()
o.Close()
o, _ = os.Create("rw2.out")
pprof.StartCPUProfile(o)
}
mx2 := make(RWMutex2, len(cpus))
start2 := time.Now()
for n := 0; n < runtime.GOMAXPROCS(0); n++ {
for r := 0; r < readers_per_core; r++ {
wg.Add(1)
go func() {
defer wg.Done()
c := cpus[cpu()]
r := rand.New(rand.NewSource(rand.Int63()))
for n := uint64(0); n < *locks; n++ {
if *checkcpu != 0 && n%*checkcpu == 0 {
c = cpus[cpu()]
}
if r.Float64() < *write {
mx2.Lock()
x := 0
for i := 0; i < *wwork; i++ {
x++
}
_ = x
mx2.Unlock()
} else {
mx2[c].RLock()
x := 0
for i := 0; i < *rwork; i++ {
x++
}
_ = x
mx2[c].RUnlock()
}
}
}()
}
}
wg.Wait()
end2 := time.Now()
pprof.StopCPUProfile()
o.Close()
t2 := end2.Sub(start2)
fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment