Created
January 27, 2018 21:05
-
-
Save crast/61779d00db7bfaa894c70d7693cee505 to your computer and use it in GitHub Desktop.
Benchmarking Reflect Channel Ops
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a test where I was trying to isolate how much we lost to `reflect` | |
// with channel operations in a tight loop. | |
// | |
// PREAMBLE | |
// We do a lot of processing of stream data where ordering with respect to a | |
// shard indicator like user ID matters. We've done this a few times where we | |
// implement fanout based on hashing into an slice/array of channels, and it's | |
// lightning fast but it ends up leaving a lot of boilerplate in your code. | |
// | |
// So I started writing a library that would do the non-critical boilerplate | |
// ops with 'reflect' with the assumption that the tight loop of the | |
// hash-and-forward operation would be suited to a little code-gen for the | |
// forwarding of 'your type'. | |
// | |
// But after benchmarking, I'm noticing that the reflect method is not THAT | |
// much slower. In a real app it would be easily many orders of magnitude less | |
// significant than the 'real' things happening like memory allocation, | |
// network, etc. | |
// | |
// ABOUT THIS TEST | |
// In order to test this, I've ripped out the bare minimum needed to support | |
// a test. I am pre-selecting the hash value to take that out of the critical | |
// path with the hope that we're mostly testing channel ops here. | |
// | |
// MY RESULTS | |
// Testing on an i7 macbook pro(2017) with 2 cores, 4 hyperthreads: | |
// | |
// $ CONSUMER_GOROUTINES=10 go test -bench . -v | |
// goos: darwin | |
// goarch: amd64 | |
// BenchmarkFanout_Typed_Simple_depth10-4 5000000 310 ns/op | |
// BenchmarkFanout_Reflect_Simple_depth10-4 3000000 466 ns/op | |
// BenchmarkFanout_Typed_Select_depth10-4 3000000 462 ns/op | |
// BenchmarkFanout_Reflect_Select_depth10-4 2000000 738 ns/op | |
// BenchmarkFanout_Typed_Simple_depthN-4 10000000 223 ns/op | |
// BenchmarkFanout_Reflect_Simple_depthN-4 5000000 336 ns/op | |
// BenchmarkFanout_Typed_Select_depthN-4 5000000 322 ns/op | |
// BenchmarkFanout_Reflect_Select_depthN-4 2000000 617 ns/op | |
// | |
// Changing CONSUMER_GOROUTINES doesn't really change the outcome much on my machine. | |
package fanout | |
import ( | |
"context" | |
"os" | |
"reflect" | |
"strconv" | |
"sync" | |
"testing" | |
) | |
// Following are the four implementations we want to test. | |
type handlerFunc func(ctx context.Context, input chan *Thing, outputs []chan *Thing) | |
// IMPL 1: Typed fanout, no select. | |
func fanoutTyped_Simple(ctx context.Context, input chan *Thing, outputs []chan *Thing) { | |
defer closeAll(outputs) | |
len64 := uint64(len(outputs)) | |
for v := range input { | |
outputs[hash(v)%len64] <- v | |
} | |
} | |
// IMPL 2: Reflect fanout, no select. | |
func fanoutReflect_Simple(ctx context.Context, input chan *Thing, outputs []chan *Thing) { | |
defer closeAll(outputs) | |
reflect_Simple_Impl(reflect.ValueOf(input), reflect.ValueOf(outputs)) | |
} | |
func reflect_Simple_Impl(input reflect.Value, outputs reflect.Value) { | |
len64 := uint64(outputs.Len()) | |
for { | |
v, ok := input.Recv() | |
if !ok { | |
break | |
} | |
idx := hash(v.Interface()) % len64 | |
outputs.Index(int(idx)).Send(v) | |
} | |
} | |
// IMPL 3: Typed fanout, with select. More realistic for production | |
// as you would use something akin to context.Context or a close channel | |
// to do clean teardown without having to consume through the buffer backlog. | |
func fanoutTyped_Select(ctx context.Context, input chan *Thing, outputs []chan *Thing) { | |
defer closeAll(outputs) | |
len64 := uint64(len(outputs)) | |
done := ctx.Done() // probably not a big optimization, but matches the reflect version | |
for { | |
select { | |
case v, ok := <-input: | |
if !ok { | |
return | |
} | |
outputs[hash(v)%len64] <- v | |
case <-done: | |
return | |
} | |
} | |
} | |
// IMPL 4: Reflect, with select | |
func fanoutReflect_Select(ctx context.Context, input chan *Thing, outputs []chan *Thing) { | |
defer closeAll(outputs) | |
reflect_Select_Impl(ctx, reflect.ValueOf(input), reflect.ValueOf(outputs)) //TODO | |
} | |
func reflect_Select_Impl(ctx context.Context, input, outputs reflect.Value) { | |
const selRecv = 0 | |
const selDone = 1 | |
selCases := []reflect.SelectCase{ | |
{Dir: reflect.SelectRecv, Chan: input}, | |
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}, | |
} | |
uLen := uint64(outputs.Len()) | |
for { | |
idx, recv, ok := reflect.Select(selCases) | |
switch idx { | |
case selRecv: | |
if !ok { | |
return | |
} | |
// Forward to upstream channels | |
hIndex := hash(recv.Interface()) % uLen | |
outputs.Index(int(hIndex)).Send(recv) | |
case selDone: | |
return | |
} | |
} | |
} | |
func harness(b *testing.B, n, depth int, handler handlerFunc) { | |
goroutines, _ := strconv.Atoi(os.Getenv("CONSUMER_GOROUTINES")) | |
downstreamDepth, _ := strconv.Atoi(os.Getenv("DOWNSTREAM_DEPTH")) | |
if goroutines == 0 { | |
b.Log("Make sure to set $CONSUMER_GOROUTINES to the goroutine count you want.") | |
b.Log("Set $DOWNSTREAM_DEPTH (default 5) to the per-consumer queue depth") | |
b.FailNow() | |
} | |
if downstreamDepth == 0 { | |
downstreamDepth = 5 | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
input := produce(n, depth) | |
outputs := make([]chan *Thing, goroutines) | |
var wg sync.WaitGroup | |
wg.Add(goroutines) | |
for i := 0; i < goroutines; i++ { | |
outputs[i] = make(chan *Thing, downstreamDepth) | |
go consumer(&wg, outputs[i]) | |
} | |
// Everything should be running, but blocked until we start the fanout handler | |
b.ResetTimer() | |
go handler(ctx, input, outputs) | |
wg.Wait() // All consumers being finished indicates we got all values | |
cancel() | |
} | |
// Input channel Depth 10 | |
func BenchmarkFanout_Typed_Simple_depth10(b *testing.B) { | |
harness(b, b.N, 10, fanoutTyped_Simple) | |
} | |
func BenchmarkFanout_Reflect_Simple_depth10(b *testing.B) { | |
harness(b, b.N, 10, fanoutReflect_Simple) | |
} | |
func BenchmarkFanout_Typed_Select_depth10(b *testing.B) { | |
harness(b, b.N, 10, fanoutTyped_Select) | |
} | |
func BenchmarkFanout_Reflect_Select_depth10(b *testing.B) { | |
harness(b, b.N, 10, fanoutReflect_Select) | |
} | |
// Input channel pre-filled | |
func BenchmarkFanout_Typed_Simple_depthN(b *testing.B) { | |
harness(b, b.N, b.N, fanoutTyped_Simple) | |
} | |
func BenchmarkFanout_Reflect_Simple_depthN(b *testing.B) { | |
harness(b, b.N, b.N, fanoutReflect_Simple) | |
} | |
func BenchmarkFanout_Typed_Select_depthN(b *testing.B) { | |
harness(b, b.N, b.N, fanoutTyped_Select) | |
} | |
func BenchmarkFanout_Reflect_Select_depthN(b *testing.B) { | |
harness(b, b.N, b.N, fanoutReflect_Select) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fanout | |
import ( | |
"encoding/binary" | |
"hash/fnv" | |
"sync" | |
) | |
type Thing struct { | |
Index int | |
Hash uint64 | |
} | |
func makeThings(n int) []*Thing { | |
output := make([]*Thing, n) | |
h := fnv.New64() | |
for i := 0; i < n; i++ { | |
// We're doing the hashing in the producer because we want to isolate it from our benchmark | |
// if possible. | |
binary.Write(h, binary.BigEndian, int64(i)) | |
output[i] = &Thing{ | |
Index: i, | |
Hash: h.Sum64(), | |
} | |
} | |
return output | |
} | |
func produce(n int, depth int) chan *Thing { | |
c := make(chan *Thing, depth) | |
inputs := makeThings(n) | |
fill := func() { | |
for _, v := range inputs { | |
c <- v | |
} | |
close(c) | |
} | |
if depth >= n { | |
// We can send a pre-filled channel when depth is > N | |
// This can potentially isolate having the context switching of a separate goroutine | |
fill() | |
} else { | |
go fill() | |
} | |
return c | |
} | |
func consumer(wg *sync.WaitGroup, c <-chan *Thing) { | |
defer wg.Done() | |
for x := range c { | |
x.Index *= 2 // do some real work so compiler doesn't compile us out | |
} | |
} | |
func hash(v interface{}) uint64 { | |
return v.(*Thing).Hash // Cheating, but we want to isolate from the hash algo | |
} | |
func closeAll(channels []chan *Thing) { | |
for _, c := range channels { | |
close(c) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment