Created
September 24, 2023 15:30
-
-
Save danielealbano/3f11afb2f549ee3c90081dcfcc68255f to your computer and use it in GitHub Desktop.
golang - mpmc ringbuffer - spinlock vs mutex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
go test -bench=. | |
goos: linux | |
goarch: amd64 | |
pkg: pgproxy/internal/helpers | |
cpu: AMD Ryzen 9 3950X 16-Core Processor | |
BenchmarkRingBufferEnqueueDequeueMutex-32 5174512 212.9 ns/op 0 B/op 0 allocs/op | |
BenchmarkRingBufferEnqueueMutex-32 8517314 127.9 ns/op 0 B/op 0 allocs/op | |
BenchmarkRingBufferDequeueMutex-32 16836531 71.17 ns/op 0 B/op 0 allocs/op | |
BenchmarkRingBufferEnqueueDequeueSpinlock-32 3977191 303.1 ns/op 0 B/op 0 allocs/op | |
BenchmarkRingBufferEnqueueSpinlock-32 7919860 146.9 ns/op 0 B/op 0 allocs/op | |
BenchmarkRingBufferDequeueSpinlock-32 9238311 132.6 ns/op 0 B/op 0 allocs/op | |
PASS | |
ok pgproxy/internal/helpers 8.825s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package helpers | |
import ( | |
"errors" | |
"sync" | |
) | |
type T interface{} | |
var ( | |
errFull = errors.New("full") | |
errEmpty = errors.New("empty") | |
) | |
type RingBuffer struct { | |
lock sync.Locker | |
items []T | |
capacity int | |
head int | |
tail int | |
full bool | |
} | |
func NewRingBuffer(size int, lock sync.Locker) *RingBuffer { | |
rb := &RingBuffer{ | |
lock: lock, | |
items: make([]T, size), | |
capacity: size, | |
} | |
return rb | |
} | |
func (s *RingBuffer) Capacity() int { | |
return s.capacity | |
} | |
func (s *RingBuffer) Len() int { | |
if s.full { | |
return s.capacity | |
} | |
if s.tail >= s.head { | |
return s.tail - s.head | |
} | |
return s.capacity - s.head + s.tail | |
} | |
func (s *RingBuffer) IsFull() bool { | |
return s.full | |
} | |
func (s *RingBuffer) Enqueue(item T) error { | |
if s.IsFull() { | |
return errFull | |
} | |
s.lock.Lock() | |
s.items[s.tail] = item | |
s.tail = (s.tail + 1) % s.capacity | |
s.full = s.head == s.tail | |
s.lock.Unlock() | |
return nil | |
} | |
func (s *RingBuffer) Dequeue() (T, error) { | |
if s.Len() == 0 { | |
return nil, errEmpty | |
} | |
s.lock.Lock() | |
data := s.items[s.head] | |
s.full = false | |
s.head = (s.head + 1) % s.capacity | |
s.lock.Unlock() | |
return data, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package helpers | |
import ( | |
"errors" | |
"sync" | |
"testing" | |
) | |
func TestNewRingBuffer(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
if q.IsFull() != false { | |
t.Errorf("IsFull() should return false") | |
} | |
if q.Capacity() != 10 { | |
t.Errorf("Capacity() should return 10") | |
} | |
} | |
func TestRingBufferEnqueue(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
err := q.Enqueue(1) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
if q.IsFull() != false { | |
t.Errorf("IsFull() should return false") | |
} | |
} | |
func TestRingBufferEnqueueFullError(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
for i := 0; i < 10; i++ { | |
err := q.Enqueue(i) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
} | |
if q.IsFull() != true { | |
t.Errorf("IsFull() should return true") | |
} | |
err := q.Enqueue(1) | |
if !errors.Is(err, errFull) { | |
t.Errorf("Enqueue() should return an error") | |
} | |
} | |
func TestRingBufferDequeue(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
err := q.Enqueue(1) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
_, err = q.Dequeue() | |
if err != nil { | |
t.Errorf("Dequeue() should not return an error") | |
} | |
if q.IsFull() != false { | |
t.Errorf("IsFull() should return false") | |
} | |
} | |
func TestRingBufferDequeueEmpty(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
_, err := q.Dequeue() | |
if !errors.Is(err, errEmpty) { | |
t.Errorf("Dequeue() should return an error") | |
} | |
} | |
func TestRingBufferLen(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
err := q.Enqueue(1) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
if q.Len() != 1 { | |
t.Errorf("Len() should return 1") | |
} | |
} | |
func TestRingBufferLenFull(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
for i := 0; i < 10; i++ { | |
err := q.Enqueue(i) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
} | |
if q.Len() != 10 { | |
t.Errorf("Len() should return 10") | |
} | |
} | |
func TestRingBufferLenAfterMultipleCycles(t *testing.T) { | |
q := NewRingBuffer(10, &sync.Mutex{}) | |
for i := 0; i < 10; i++ { | |
err := q.Enqueue(i) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
} | |
for i := 0; i < 5; i++ { | |
_, err := q.Dequeue() | |
if err != nil { | |
t.Errorf("Dequeue() should not return an error") | |
} | |
} | |
for i := 0; i < 5; i++ { | |
err := q.Enqueue(i) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
} | |
for i := 0; i < 10; i++ { | |
_, err := q.Dequeue() | |
if err != nil { | |
t.Errorf("Dequeue() should not return an error") | |
} | |
} | |
for i := 0; i < 5; i++ { | |
err := q.Enqueue(i) | |
if err != nil { | |
t.Errorf("Enqueue() should not return an error") | |
} | |
} | |
if q.Len() != 5 { | |
t.Errorf("Len() should return 5") | |
} | |
} | |
func internalBenchmarkRingBufferEnqueueDequeue(b *testing.B, q *RingBuffer) { | |
b.ReportAllocs() | |
b.ResetTimer() | |
b.RunParallel(func(pb *testing.PB) { | |
for pb.Next() { | |
_ = q.Enqueue(1) | |
_, _ = q.Dequeue() | |
} | |
}) | |
} | |
func internalBenchmarkRingBufferEnqueue(b *testing.B, q *RingBuffer) { | |
b.ReportAllocs() | |
b.ResetTimer() | |
b.RunParallel(func(pb *testing.PB) { | |
for pb.Next() { | |
_ = q.Enqueue(1) | |
} | |
}) | |
} | |
func internalBenchmarkRingBufferDequeue(b *testing.B, q *RingBuffer) { | |
for i := 0; i < b.N; i++ { | |
_ = q.Enqueue(i) | |
} | |
b.ReportAllocs() | |
b.ResetTimer() | |
b.RunParallel(func(pb *testing.PB) { | |
for pb.Next() { | |
_, _ = q.Dequeue() | |
} | |
}) | |
} | |
func BenchmarkRingBufferEnqueueDequeueMutex(b *testing.B) { | |
q := NewRingBuffer(b.N, &sync.Mutex{}) | |
internalBenchmarkRingBufferEnqueueDequeue(b, q) | |
} | |
func BenchmarkRingBufferEnqueueMutex(b *testing.B) { | |
q := NewRingBuffer(b.N, &sync.Mutex{}) | |
internalBenchmarkRingBufferEnqueue(b, q) | |
} | |
func BenchmarkRingBufferDequeueMutex(b *testing.B) { | |
q := NewRingBuffer(b.N, &sync.Mutex{}) | |
internalBenchmarkRingBufferDequeue(b, q) | |
} | |
func BenchmarkRingBufferEnqueueDequeueSpinlock(b *testing.B) { | |
q := NewRingBuffer(b.N, &SpinLock{}) | |
internalBenchmarkRingBufferEnqueueDequeue(b, q) | |
} | |
func BenchmarkRingBufferEnqueueSpinlock(b *testing.B) { | |
q := NewRingBuffer(b.N, &SpinLock{}) | |
internalBenchmarkRingBufferEnqueue(b, q) | |
} | |
func BenchmarkRingBufferDequeueSpinlock(b *testing.B) { | |
q := NewRingBuffer(b.N, &SpinLock{}) | |
internalBenchmarkRingBufferDequeue(b, q) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package helpers | |
import ( | |
"sync/atomic" | |
"time" | |
) | |
// go runtime doSpin uses 30 but not a relevant difference | |
var waitSpins = 20 | |
type SpinLock struct { | |
v uint32 | |
} | |
func (l *SpinLock) Lock() { | |
for !l.TryLock() { | |
l.spin() | |
} | |
} | |
func (l *SpinLock) LockWithTimeout(duration time.Duration) bool { | |
start := time.Now() | |
for !l.TryLock() { | |
if time.Since(start) > duration { | |
return false | |
} | |
l.spin() | |
} | |
return true | |
} | |
func (l *SpinLock) Unlock() { | |
l.v = 0 | |
} | |
func (l *SpinLock) TryLock() bool { | |
if l.v == 1 { | |
return false | |
} | |
return atomic.CompareAndSwapUint32(&l.v, 0, 1) | |
} | |
func (l *SpinLock) IsLocked() bool { | |
return atomic.LoadUint32(&l.v) == 1 | |
} | |
func (l *SpinLock) spin() { | |
for spin := 0; spin < waitSpins; spin++ { | |
// spin | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package helpers | |
import ( | |
"testing" | |
"time" | |
) | |
func TestSpinLock(t *testing.T) { | |
l := &SpinLock{} | |
l.Lock() | |
if !l.IsLocked() { | |
t.Errorf("IsLocked() should return true") | |
} | |
l.Unlock() | |
if l.IsLocked() { | |
t.Errorf("IsLocked() should return false") | |
} | |
} | |
func TestSpinLockTryLock(t *testing.T) { | |
l := &SpinLock{} | |
if !l.TryLock() { | |
t.Errorf("TryLock() should return true") | |
} | |
if l.TryLock() { | |
t.Errorf("TryLock() should return false") | |
} | |
l.Unlock() | |
} | |
func TestSpinLockTryLock2(t *testing.T) { | |
l := &SpinLock{} | |
l.Lock() | |
if !l.IsLocked() { | |
t.Errorf("IsLocked() should return true") | |
} | |
if l.TryLock() { | |
t.Errorf("TryLock() should return false") | |
} | |
l.Unlock() | |
} | |
func TestSpinLockUnlock(t *testing.T) { | |
l := &SpinLock{} | |
l.Lock() | |
if !l.IsLocked() { | |
t.Errorf("IsLocked() should return true") | |
} | |
l.Unlock() | |
if l.IsLocked() { | |
t.Errorf("IsLocked() should return false") | |
} | |
} | |
func TestLockWithTimeout(t *testing.T) { | |
l := &SpinLock{} | |
l.Lock() | |
if !l.IsLocked() { | |
t.Errorf("IsLocked() should return true") | |
} | |
start := time.Now() | |
if l.LockWithTimeout(time.Millisecond * 5) { | |
t.Errorf("LockWithTimeout() should return false") | |
} | |
end := time.Now() | |
if end.Sub(start) < time.Millisecond*5 { | |
t.Errorf("LockWithTimeout() should wait at least 5ms") | |
} | |
// Allow and extra ms for padding and time calculation rounding | |
if end.Sub(start) > time.Millisecond*6 { | |
t.Errorf("LockWithTimeout() should wait at most 6ms") | |
} | |
l.Unlock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment