Last active
November 1, 2017 18:18
-
-
Save bruth/6c050b1b2c327ef1da8c788222847f5a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
"github.com/golang/protobuf/proto" | |
"github.com/nats-io/nats" | |
) | |
func main() { | |
var ( | |
addr string | |
topic string | |
shards int | |
pubs int | |
instances int64 | |
) | |
flag.StringVar(&addr, "addr", "nats://localhost:4222", "NATS address.") | |
flag.StringVar(&topic, "topic", "test.shard", "Sharded topic.") | |
flag.IntVar(&shards, "shards", 1, "Number of shards.") | |
flag.IntVar(&pubs, "pubs", 1, "Number of publishers.") | |
flag.Int64Var(&instances, "instances", 64, "Aggregate instances.") | |
flag.Parse() | |
conn, err := nats.Connect(addr) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer conn.Close() | |
sub, err := ShardedSubscribe(conn, topic, func(msg *nats.Msg) string { | |
var m Request | |
proto.Unmarshal(msg.Data, &m) | |
return fmt.Sprintf("%s.%d", msg.Subject, m.Instance%int64(shards)) | |
}) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer sub.Unsubscribe() | |
for i := 0; i < shards; i++ { | |
go func(i int) { | |
state := make(map[int64]int32) | |
conn.Subscribe(fmt.Sprintf("%s.%d", topic, i), func(msg *nats.Msg) { | |
// Random latency. | |
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) | |
var m Request | |
proto.Unmarshal(msg.Data, &m) | |
rep := &Reply{ | |
Instance: m.Instance, | |
} | |
v := state[m.Instance] | |
if m.Version == state[m.Instance] { | |
v++ | |
state[m.Instance] = v | |
/* | |
if m.Retries > 0 { | |
log.Printf("%d:%d, %d -> %d [fixed]", i, m.Instance, m.Version, v) | |
} | |
*/ | |
} else { | |
rep.Conflict = true | |
//log.Printf("%d:%d, %d != %d [retry %d]", i, m.Instance, m.Version, v, m.Retries) | |
} | |
rep.Version = v | |
b, _ := proto.Marshal(rep) | |
conn.Publish(msg.Reply, b) | |
}) | |
}(i) | |
} | |
sig := make(chan os.Signal) | |
signal.Notify(sig, os.Interrupt, syscall.SIGTERM) | |
wg := &sync.WaitGroup{} | |
wg.Add(pubs) | |
Sum := func(a ...int32) int32 { | |
var s int32 | |
for _, x := range a { | |
s = s + x | |
} | |
return s | |
} | |
t0 := time.Now() | |
var t1 time.Time | |
reqs := make([]int32, pubs) | |
// Current versions | |
state := make([]int32, int(instances)) | |
var retries int32 | |
for i := 0; i < pubs; i++ { | |
go func(i int) { | |
defer wg.Done() | |
var r Reply | |
var req *Request | |
for { | |
select { | |
case <-sig: | |
return | |
default: | |
//time.Sleep(100 * time.Millisecond) | |
if req == nil { | |
inst := rand.Int63n(instances) | |
req = &Request{ | |
Instance: inst, | |
Version: state[inst], | |
} | |
} else { | |
req.Retries++ | |
req.Version = state[req.Instance] | |
} | |
b, _ := proto.Marshal(req) | |
reply, err := conn.Request(topic, b, 2*time.Second) | |
if err != nil { | |
log.Fatalf("publisher: %s", err) | |
} | |
proto.Unmarshal(reply.Data, &r) | |
state[r.Instance] = r.Version | |
if r.Conflict { | |
retries = atomic.AddInt32(&retries, 1) | |
} else { | |
reqs[i]++ | |
req = nil | |
} | |
} | |
} | |
}(i) | |
} | |
select { | |
case <-sig: | |
case <-time.After(20 * time.Second): | |
} | |
close(sig) | |
t1 = time.Now() | |
wg.Wait() | |
d := t1.Sub(t0) | |
log.Print(d) | |
t := Sum(reqs...) | |
log.Print(t) | |
log.Printf("%f m/s", float64(t)/float64(d/time.Second)) | |
log.Printf("retries: %d", retries) | |
log.Printf("%f r/s", float64(retries)/float64(d/time.Second)) | |
} | |
type Sharder func(msg *nats.Msg) string | |
func ShardedSubscribe(conn *nats.Conn, subject string, sharder Sharder) (*nats.Subscription, error) { | |
var ( | |
err error | |
sub *nats.Subscription | |
) | |
sub, err = conn.Subscribe(subject, func(msg *nats.Msg) { | |
next := sharder(msg) | |
if next == subject { | |
log.Print("sharded subject cannot be the parent subject") | |
log.Print("closing sharded subscription") | |
if err := sub.Unsubscribe(); err != nil { | |
log.Print(err) | |
} | |
return | |
} | |
msg.Subject = next | |
if err := conn.PublishMsg(msg); err != nil { | |
log.Print("sharder:", err) | |
} | |
}) | |
return sub, err | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Code generated by protoc-gen-go. | |
// source: msg.proto | |
// DO NOT EDIT! | |
/* | |
Package main is a generated protocol buffer package. | |
It is generated from these files: | |
msg.proto | |
It has these top-level messages: | |
Reply | |
Request | |
*/ | |
package main | |
import proto "github.com/golang/protobuf/proto" | |
import fmt "fmt" | |
import math "math" | |
// Reference imports to suppress errors if they are not otherwise used. | |
var _ = proto.Marshal | |
var _ = fmt.Errorf | |
var _ = math.Inf | |
// This is a compile-time assertion to ensure that this generated file | |
// is compatible with the proto package it is being compiled against. | |
// A compilation error at this line likely means your copy of the | |
// proto package needs to be updated. | |
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package | |
type Reply struct { | |
Instance int64 `protobuf:"varint,1,opt,name=instance" json:"instance,omitempty"` | |
Version int32 `protobuf:"varint,2,opt,name=version" json:"version,omitempty"` | |
Conflict bool `protobuf:"varint,3,opt,name=conflict" json:"conflict,omitempty"` | |
} | |
func (m *Reply) Reset() { *m = Reply{} } | |
func (m *Reply) String() string { return proto.CompactTextString(m) } | |
func (*Reply) ProtoMessage() {} | |
func (*Reply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } | |
func (m *Reply) GetInstance() int64 { | |
if m != nil { | |
return m.Instance | |
} | |
return 0 | |
} | |
func (m *Reply) GetVersion() int32 { | |
if m != nil { | |
return m.Version | |
} | |
return 0 | |
} | |
func (m *Reply) GetConflict() bool { | |
if m != nil { | |
return m.Conflict | |
} | |
return false | |
} | |
type Request struct { | |
Instance int64 `protobuf:"varint,1,opt,name=instance" json:"instance,omitempty"` | |
Version int32 `protobuf:"varint,2,opt,name=version" json:"version,omitempty"` | |
Retries int32 `protobuf:"varint,3,opt,name=retries" json:"retries,omitempty"` | |
} | |
func (m *Request) Reset() { *m = Request{} } | |
func (m *Request) String() string { return proto.CompactTextString(m) } | |
func (*Request) ProtoMessage() {} | |
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } | |
func (m *Request) GetInstance() int64 { | |
if m != nil { | |
return m.Instance | |
} | |
return 0 | |
} | |
func (m *Request) GetVersion() int32 { | |
if m != nil { | |
return m.Version | |
} | |
return 0 | |
} | |
func (m *Request) GetRetries() int32 { | |
if m != nil { | |
return m.Retries | |
} | |
return 0 | |
} | |
func init() { | |
proto.RegisterType((*Reply)(nil), "main.Reply") | |
proto.RegisterType((*Request)(nil), "main.Request") | |
} | |
func init() { proto.RegisterFile("msg.proto", fileDescriptor0) } | |
var fileDescriptor0 = []byte{ | |
// 144 bytes of a gzipped FileDescriptorProto | |
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcc, 0x2d, 0x4e, 0xd7, | |
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x8a, 0xe4, 0x62, 0x0d, | |
0x4a, 0x2d, 0xc8, 0xa9, 0x14, 0x92, 0xe2, 0xe2, 0xc8, 0xcc, 0x2b, 0x2e, 0x49, 0xcc, 0x4b, 0x4e, | |
0x95, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf3, 0x85, 0x24, 0xb8, 0xd8, 0xcb, 0x52, 0x8b, | |
0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x14, 0x18, 0x35, 0x58, 0x83, 0x60, 0x5c, 0x90, 0xae, 0xe4, | |
0xfc, 0xbc, 0xb4, 0x9c, 0xcc, 0xe4, 0x12, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x8e, 0x20, 0x38, 0x5f, | |
0x29, 0x92, 0x8b, 0x3d, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x84, 0x4c, 0xc3, 0x25, 0xb8, 0xd8, | |
0x8b, 0x52, 0x4b, 0x8a, 0x32, 0x53, 0x8b, 0xc1, 0x66, 0xb3, 0x06, 0xc1, 0xb8, 0x49, 0x6c, 0x60, | |
0x2f, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xab, 0xbc, 0xa8, 0xb6, 0xcf, 0x00, 0x00, 0x00, | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package main; | |
message Reply { | |
int64 instance = 1; | |
int32 version = 2; | |
bool conflict = 3; | |
} | |
message Request { | |
int64 instance = 1; | |
int32 version = 2; | |
int32 retries = 3; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment