Skip to content

Instantly share code, notes, and snippets.

@osamu
Last active October 7, 2015 11:29
Show Gist options
  • Save osamu/bca3e4128ee4876a0890 to your computer and use it in GitHub Desktop.
Save osamu/bca3e4128ee4876a0890 to your computer and use it in GitHub Desktop.
test nats
package main
import (
"crypto/rand"
"github.com/nats-io/nats"
"time"
"fmt"
"flag"
)
type message struct {
Seq int
SrcId int
DstId int
TimeMicro int64
Payload []byte
}
type worker_report struct {
receive_rate float64
delay time.Duration
}
type stats struct {
min float64
max float64
avg float64
sum float64
cnt int
}
type statsreport struct {
delay stats
rate stats
}
func main() {
server := flag.String("server", nats.DefaultURL, "Nats Server URL")
only_publish := flag.Bool("only-publish", false, "Client only publish checker message")
publish_size := flag.Int("publish_size", 10, "Publisher message payload size")
only_subscribe := flag.Bool("only-subscribe", false, "Client only publish checker message")
queue_name := flag.String("queue", "queue", "Target queue name")
concurrent := flag.Int("concurrent", 10, "subscriber concurrency")
publish_rate := flag.Int("publish_rate", 10, "publisher message/sec")
flag.Parse()
if *only_publish {
publisher(*server, *queue_name, *publish_size, *publish_rate)
} else if !*only_subscribe {
go publisher(*server, *queue_name, *publish_size, *publish_rate)
}
reportChan := make(chan worker_report)
if !*only_publish {
for i := 0; i < *concurrent; i++ {
fmt.Printf("Start subscriber%d\n",i)
time.Sleep( time.Duration(100)* time.Millisecond)
go subscriber(*server, *queue_name, reportChan);
}
var stats statsreport
var count int
for {
report:= <- reportChan
stats.delay.sum += report.delay.Seconds()
stats.delay.cnt += 1
if report.delay.Seconds() > stats.delay.max {
stats.delay.max = report.delay.Seconds()
}else if report.delay.Seconds() < stats.delay.min {
stats.delay.min = report.delay.Seconds()
}
stats.rate.sum += report.receive_rate
stats.rate.cnt += 1
fmt.Printf("delay avg. %f[ms] max: %f, min: %f\n",
(stats.delay.sum/float64(stats.delay.cnt))*1000.0,
stats.delay.max*1000.0, stats.delay.min*1000.0)
count++
if count % 5 == 0 {
stats.delay.cnt = 0
stats.rate.cnt = 0
stats.delay.sum = 0.0
stats.rate.sum = 0.0
}
}
}
}
func publisher(server string, queue string, payload_size int, rate int) {
nc, _ := nats.Connect(server);
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()
sendCh := make(chan *message)
ec.BindSendChan(queue, sendCh)
rate_prev_time := time.Now()
i := 0
for {
time.Sleep( time.Duration(1000/rate)* time.Millisecond)
now := time.Now()
c := payload_size
b := make([]byte, c)
rand.Read(b)
checker := &message{Seq: i, SrcId:0, DstId:0,
TimeMicro: now.UnixNano(), Payload: b}
sendCh <- checker
i++
if i % 100 == 0 {
fmt.Printf("Publish %f /sec\n",
float64(100)/float64(time.Since(rate_prev_time).Seconds()))
rate_prev_time = now
}
}
}
func subscriber(server string, queue string, reportChan chan worker_report) {
nc, err := nats.Connect(server)
if err != nil {
fmt.Printf("Failed to create default connection: %v\n", err)
return
}
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()
rate_prev_time := time.Now()
recvCh := make(chan *message)
ec.BindRecvChan(queue, recvCh)
var nanodelay time.Duration
i := 1
for {
checker := <- recvCh
if i % 100 == 0 {
now := time.Now().UnixNano()
nanodelay = time.Duration(now - checker.TimeMicro)
report := worker_report{ receive_rate: float64(100)/float64(time.Since(rate_prev_time).Seconds()), delay: nanodelay }
reportChan <- report
// reset counter
rate_prev_time = time.Now()
}
i++
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment