Last active
October 7, 2015 11:29
-
-
Save osamu/bca3e4128ee4876a0890 to your computer and use it in GitHub Desktop.
test nats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/rand" | |
"github.com/nats-io/nats" | |
"time" | |
"fmt" | |
"flag" | |
) | |
type message struct { | |
Seq int | |
SrcId int | |
DstId int | |
TimeMicro int64 | |
Payload []byte | |
} | |
type worker_report struct { | |
receive_rate float64 | |
delay time.Duration | |
} | |
type stats struct { | |
min float64 | |
max float64 | |
avg float64 | |
sum float64 | |
cnt int | |
} | |
type statsreport struct { | |
delay stats | |
rate stats | |
} | |
func main() { | |
server := flag.String("server", nats.DefaultURL, "Nats Server URL") | |
only_publish := flag.Bool("only-publish", false, "Client only publish checker message") | |
publish_size := flag.Int("publish_size", 10, "Publisher message payload size") | |
only_subscribe := flag.Bool("only-subscribe", false, "Client only publish checker message") | |
queue_name := flag.String("queue", "queue", "Target queue name") | |
concurrent := flag.Int("concurrent", 10, "subscriber concurrency") | |
publish_rate := flag.Int("publish_rate", 10, "publisher message/sec") | |
flag.Parse() | |
if *only_publish { | |
publisher(*server, *queue_name, *publish_size, *publish_rate) | |
} else if !*only_subscribe { | |
go publisher(*server, *queue_name, *publish_size, *publish_rate) | |
} | |
reportChan := make(chan worker_report) | |
if !*only_publish { | |
for i := 0; i < *concurrent; i++ { | |
fmt.Printf("Start subscriber%d\n",i) | |
time.Sleep( time.Duration(100)* time.Millisecond) | |
go subscriber(*server, *queue_name, reportChan); | |
} | |
var stats statsreport | |
var count int | |
for { | |
report:= <- reportChan | |
stats.delay.sum += report.delay.Seconds() | |
stats.delay.cnt += 1 | |
if report.delay.Seconds() > stats.delay.max { | |
stats.delay.max = report.delay.Seconds() | |
}else if report.delay.Seconds() < stats.delay.min { | |
stats.delay.min = report.delay.Seconds() | |
} | |
stats.rate.sum += report.receive_rate | |
stats.rate.cnt += 1 | |
fmt.Printf("delay avg. %f[ms] max: %f, min: %f\n", | |
(stats.delay.sum/float64(stats.delay.cnt))*1000.0, | |
stats.delay.max*1000.0, stats.delay.min*1000.0) | |
count++ | |
if count % 5 == 0 { | |
stats.delay.cnt = 0 | |
stats.rate.cnt = 0 | |
stats.delay.sum = 0.0 | |
stats.rate.sum = 0.0 | |
} | |
} | |
} | |
} | |
func publisher(server string, queue string, payload_size int, rate int) { | |
nc, _ := nats.Connect(server); | |
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER) | |
defer ec.Close() | |
sendCh := make(chan *message) | |
ec.BindSendChan(queue, sendCh) | |
rate_prev_time := time.Now() | |
i := 0 | |
for { | |
time.Sleep( time.Duration(1000/rate)* time.Millisecond) | |
now := time.Now() | |
c := payload_size | |
b := make([]byte, c) | |
rand.Read(b) | |
checker := &message{Seq: i, SrcId:0, DstId:0, | |
TimeMicro: now.UnixNano(), Payload: b} | |
sendCh <- checker | |
i++ | |
if i % 100 == 0 { | |
fmt.Printf("Publish %f /sec\n", | |
float64(100)/float64(time.Since(rate_prev_time).Seconds())) | |
rate_prev_time = now | |
} | |
} | |
} | |
func subscriber(server string, queue string, reportChan chan worker_report) { | |
nc, err := nats.Connect(server) | |
if err != nil { | |
fmt.Printf("Failed to create default connection: %v\n", err) | |
return | |
} | |
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) | |
defer ec.Close() | |
rate_prev_time := time.Now() | |
recvCh := make(chan *message) | |
ec.BindRecvChan(queue, recvCh) | |
var nanodelay time.Duration | |
i := 1 | |
for { | |
checker := <- recvCh | |
if i % 100 == 0 { | |
now := time.Now().UnixNano() | |
nanodelay = time.Duration(now - checker.TimeMicro) | |
report := worker_report{ receive_rate: float64(100)/float64(time.Since(rate_prev_time).Seconds()), delay: nanodelay } | |
reportChan <- report | |
// reset counter | |
rate_prev_time = time.Now() | |
} | |
i++ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment