Skip to content

Instantly share code, notes, and snippets.

@streadway
Created February 9, 2014 13:22
Show Gist options
  • Save streadway/8899006 to your computer and use it in GitHub Desktop.
Save streadway/8899006 to your computer and use it in GitHub Desktop.
AMQP stress test #93
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
const (
writeURL = "amqp://localhost/"
updateInterval = 1 * time.Second
publishInterval = time.Second / 10000
queue = "stress"
msgSize = 1000
limit = 50000
)
type stress struct {
err chan error
in chan struct{}
out chan struct{}
ret chan amqp.Return
ack chan uint64
nack chan uint64
}
type counter struct {
total int
prev int
}
func (c *counter) perSecond(d time.Duration) float64 {
return float64(c.total-c.prev) / float64(d) * float64(time.Second)
}
func (c *counter) increment() {
c.total++
}
func (c *counter) reset() {
c.prev = c.total
}
func (s stress) report(interval <-chan time.Time) {
in := &counter{}
out := &counter{}
err := &counter{}
ret := &counter{}
ack := &counter{}
nack := &counter{}
last := <-interval
for {
select {
case now := <-interval:
d := now.Sub(last)
log.Println(in.perSecond(d), "msgs/sec written,",
out.perSecond(d), "msgs/sec read,",
ack.total, "acks,",
nack.total, "nacks,",
in.total-ack.total, "outstanding acks,",
ret.total, "returns,",
err.total, "errors.")
last = now
for _, c := range []*counter{in, out, err, ret, ack, nack} {
c.reset()
}
case e := <-s.err:
log.Print("stress error: ", e)
err.increment()
case <-s.in:
in.increment()
case <-s.out:
out.increment()
case <-s.ret:
ret.increment()
case <-s.ack:
ack.increment()
case <-s.nack:
nack.increment()
}
}
}
func (s stress) publish(interval <-chan time.Time, limit int, c *amqp.Channel, queue string, msg amqp.Publishing) {
if _, err := c.QueueDeclare(queue, false, true, false, false, nil); err != nil {
s.err <- err
return
}
c.NotifyConfirm(s.ack, s.nack)
if err := c.Confirm(false); err != nil {
s.err <- err
return
}
c.NotifyReturn(s.ret)
for i := 0; limit <= 0 || i < limit; i++ {
<-interval
if err := c.Publish("", queue, true, false, msg); err != nil {
s.err <- err
return
}
s.in <- struct{}{}
}
}
func mustDialChannel(url string) (*amqp.Connection, *amqp.Channel) {
conn, err := amqp.Dial(url)
if err != nil {
log.Fatal("stress connection: ", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatal("stress channel: ", err)
}
return conn, ch
}
func main() {
s := stress{
err: make(chan error),
in: make(chan struct{}),
out: make(chan struct{}),
ack: make(chan uint64),
nack: make(chan uint64),
ret: make(chan amqp.Return),
}
conn, w := mustDialChannel(writeURL)
defer conn.Close()
go s.report(time.Tick(updateInterval))
go s.publish(time.Tick(publishInterval), limit, w, queue, amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: 2,
Body: make([]byte, msgSize),
})
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment