Skip to content

Instantly share code, notes, and snippets.

▶ go test -bench=BenchmarkCond ./ring_test.go
goos: darwin
goarch: arm64
BenchmarkCond/Chan-size-64-parallelism-01-10 11056395 110.1 ns/op
BenchmarkCond/Ring-size-64-parallelism-01-10 11023176 110.8 ns/op
BenchmarkCond/Chan-size-64-parallelism-08-10 4022212 305.0 ns/op
BenchmarkCond/Ring-size-64-parallelism-08-10 5524983 187.2 ns/op
BenchmarkCond/Chan-size-64-parallelism-64-10 3942190 304.1 ns/op
BenchmarkCond/Ring-size-64-parallelism-64-10 6217990 207.5 ns/op
PASS
func (r *ring) EnqueueRequest(req request) chan response {
s := &r.slots[atomic.AddUint64(&r.write, 1)&mask]
s.cond.L.Lock()
for s.mark != 0 {
s.cond.Wait()
}
s.req = req
s.mark = 1
s.cond.L.Unlock()
return s.ch
type slot struct {
cond sync.Cond
mark uint32
req request
ch chan response
}
func newRing() *ring {
r := &ring{}
for i := range r.slots {
package busy
import (
"runtime"
"sync/atomic"
"testing"
)
type Queue interface {
EnqueueRequest()
▶ go test -bench=BenchmarkBusy ./ring_test.go
goos: darwin
goarch: arm64
BenchmarkBusy/Chan-size-64-parallelism-01-10 10699581 110.8 ns/op
BenchmarkBusy/Ring-size-64-parallelism-01-10 12498996 92.87 ns/op
BenchmarkBusy/Chan-size-64-parallelism-08-10 4005717 304.6 ns/op
BenchmarkBusy/Ring-size-64-parallelism-08-10 3665968 324.0 ns/op
BenchmarkBusy/Chan-size-64-parallelism-64-10 3974786 303.0 ns/op
BenchmarkBusy/Ring-size-64-parallelism-64-10 1000000 1421 ns/op
PASS
@rueian
rueian / reply.go
Last active February 6, 2022 11:34
func (r *ring) ReplyToNextRequest(resp response) {
r.read2++
s := &r.slots[r.read2&1023]
if atomic.LoadUint32(&s.mark) != 3 {
panic("out-of-band response should not be passed in")
}
s.ch <- resp // this should be a non-buffered channel
atomic.StoreUint32(&s.mark, 0)
}
▶ go test -bench=BenchmarkPipeliningQueue -benchmem -benchtime 2s .
goos: darwin
goarch: arm64
pkg: rueidis-benchmark
BenchmarkPipeliningQueue
BenchmarkPipeliningQueue/Channel
BenchmarkPipeliningQueue/Channel-10 5798116 406.1 ns/op 176 B/op 2 allocs/op
BenchmarkPipeliningQueue/Lockless
BenchmarkPipeliningQueue/Lockless-10 32529488 70.92 ns/op 0 B/op 0 allocs/op
PASS
func writing(outgoing *bufio.Writer, queue *ring) (err error) {
var req request
var ok bool
for err == nil {
if req, ok = queue.NextRequestToSend(); ok {
_, err = outgoing.Write(req.body)
} else {
err = outgoing.Flush()
runtime.Gosched()
}
func (r *ring) NextRequestToSend() (req request, ok bool) {
s := &r.slots[(r.read1+1)&1023]
if ok = atomic.LoadUint32(&s.mark) == 2; ok {
req = s.req
r.read1++
atomic.StoreUint32(&s.mark, 3)
}
return
}
func (r *ring) EnqueueRequest(req request) response {
s := &r.slots[atomic.AddUint64(&r.write, 1)&1023]
for atomic.CompareAndSwapUint32(&s.mark, 0, 1) {
runtime.Gosched()
}
s.req = req
atomic.StoreUint32(&s.mark, 2)
return <-s.ch
}