-
-
Save amitsaha/410dd92e23308a2bf3360fe3a4042a8d to your computer and use it in GitHub Desktop.
Golang auto-reconnect rabbitmq consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package base | |
import ( | |
"errors" | |
"fmt" | |
"github.com/manucorporat/try" | |
"github.com/simpleton/beego" | |
"github.com/streadway/amqp" | |
"math/rand" | |
"model/helper" | |
"os" | |
"runtime" | |
"time" | |
"sync/atomic" | |
) | |
// Consumer holds all infromation | |
// about the RabbitMQ connection | |
// This setup does limit a consumer | |
// to one exchange. This should not be | |
// an issue. Having to connect to multiple | |
// exchanges means something else is | |
// structured improperly. | |
type Consumer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
done chan error | |
consumerTag string // Name that consumer identifies itself to the server with | |
uri string // uri of the rabbitmq server | |
exchange string // exchange that we will bind to | |
exchangeType string // topic, direct, etc... | |
lastRecoverTime int64 | |
//track service current status | |
currentStatus atomic.Value | |
} | |
const RECOVER_INTERVAL_TIME = 6 * 60 | |
// NewConsumer returns a Consumer struct that has been initialized properly | |
// essentially don't touch conn, channel, or done and you can create Consumer manually | |
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer { | |
name, err := os.Hostname() | |
if err != nil { | |
name = "_sim" | |
} | |
consumer := &Consumer{ | |
consumerTag: fmt.Sprintf("%s%s", consumerTag, name), | |
uri: uri, | |
exchange: exchange, | |
exchangeType: exchangeType, | |
done: make(chan error), | |
lastRecoverTime: time.Now().Unix(), | |
} | |
consumer.currentStatus.Store(true) | |
return consumer | |
} | |
func maxParallelism() int { | |
maxProcs := runtime.GOMAXPROCS(0) | |
numCPU := runtime.NumCPU() | |
if maxProcs < numCPU { | |
return maxProcs | |
} | |
return numCPU | |
} | |
func RunConsumer(consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) { | |
rabbitUri := fmt.Sprintf("amqp://%s:%s@%s/", | |
beego.AppConfig.String("mqAccount"), | |
beego.AppConfig.String("mqPassword"), | |
beego.AppConfig.String("mqAddress"), | |
) | |
consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType) | |
if err := consumer.Connect(); err != nil { | |
helper.FailOnError(err, fmt.Sprintf("[%s]connect error", consumerTag)) | |
} | |
deliveries, err := consumer.AnnounceQueue(queueName, routingKey) | |
helper.FailOnError(err, fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag)) | |
consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey) | |
} | |
// ReConnect is called in places where NotifyClose() channel is called | |
// wait 30 seconds before trying to reconnect. Any shorter amount of time | |
// will likely destroy the error log while waiting for servers to come | |
// back online. This requires two parameters which is just to satisfy | |
// the AccounceQueue call and allows greater flexability | |
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) { | |
c.Close() | |
time.Sleep(time.Duration(15 + rand.Intn(60) + 2*retryTime) * time.Second) | |
beego.Info("Try ReConnect with times:", retryTime) | |
if err := c.Connect(); err != nil { | |
return nil, err | |
} | |
deliveries, err := c.AnnounceQueue(queueName, routingKey) | |
if err != nil { | |
return deliveries, errors.New("Couldn't connect") | |
} | |
return deliveries, nil | |
} | |
// Connect to RabbitMQ server | |
func (c *Consumer) Connect() error { | |
var err error | |
beego.Info("dialing: ", c.uri) | |
c.conn, err = amqp.Dial(c.uri) | |
if err != nil { | |
return fmt.Errorf("Dial: %s", err) | |
} | |
go func() { | |
// Waits here for the channel to be closed | |
beego.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error))) | |
// Let Handle know it's not time to reconnect | |
c.done <- errors.New("Channel Closed") | |
}() | |
beego.Info("got Connection, getting Channel") | |
c.channel, err = c.conn.Channel() | |
if err != nil { | |
return fmt.Errorf("Channel: %s", err) | |
} | |
beego.Info("got Channel, declaring Exchange ", c.exchange) | |
if err = c.channel.ExchangeDeclare( | |
c.exchange, // name of the exchange | |
c.exchangeType, // type | |
true, // durable | |
false, // delete when complete | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return fmt.Errorf("Exchange Declare: %s", err) | |
} | |
return nil | |
} | |
// AnnounceQueue sets the queue that will be listened to for this | |
// connection... | |
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) { | |
beego.Info("declared Exchange, declaring Queue:", queueName) | |
queue, err := c.channel.QueueDeclare( | |
queueName, // name of the queue | |
true, // durable | |
false, // delete when usused | |
false, // exclusive | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Declare: %s", err) | |
} | |
beego.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)", | |
queue.Name, queue.Messages, queue.Consumers, routingKey)) | |
// Qos determines the amount of messages that the queue will pass to you before | |
// it waits for you to ack them. This will slow down queue consumption but | |
// give you more certainty that all messages are being processed. As load increases | |
// I would reccomend upping the about of Threads and Processors the go process | |
// uses before changing this although you will eventually need to reach some | |
// balance between threads, procs, and Qos. | |
err = c.channel.Qos(50, 0, false) | |
if err != nil { | |
return nil, fmt.Errorf("Error setting qos: %s", err) | |
} | |
if err = c.channel.QueueBind( | |
queue.Name, // name of the queue | |
routingKey, // routingKey | |
c.exchange, // sourceExchange | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return nil, fmt.Errorf("Queue Bind: %s", err) | |
} | |
beego.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag) | |
deliveries, err := c.channel.Consume( | |
queue.Name, // name | |
c.consumerTag, // consumerTag, | |
false, // noAck | |
false, // exclusive | |
false, // noLocal | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Consume: %s", err) | |
} | |
return deliveries, nil | |
} | |
func (c *Consumer) Close() { | |
if c.channel != nil { | |
c.channel.Close() | |
c.channel = nil | |
} | |
if c.conn != nil { | |
c.conn.Close() | |
c.conn = nil | |
} | |
} | |
func (c *Consumer) Handle( | |
deliveries <-chan amqp.Delivery, | |
fn func([]byte) bool, | |
threads int, | |
queue string, | |
routingKey string) { | |
var err error | |
for { | |
beego.Info("Enter for busy loop with thread:", threads) | |
for i := 0; i < threads; i++ { | |
go func() { | |
beego.Info("Enter go with thread with deliveries", deliveries) | |
for msg := range deliveries { | |
beego.Info("Enter deliver") | |
ret := false | |
try.This(func() { | |
body := msg.Body[:] | |
ret = fn(body) | |
}).Finally(func() { | |
if ret == true { | |
msg.Ack(false) | |
currentTime := time.Now().Unix() | |
if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) { | |
beego.Info("Try to Recover Unack Messages!") | |
c.currentStatus.Store(true) | |
c.lastRecoverTime = currentTime | |
c.channel.Recover(true) | |
} | |
} else { | |
// this really a litter dangerous. if the worker is panic very quickly, | |
// it will ddos our sentry server......plz, add [retry-ttl] in header. | |
//msg.Nack(false, true) | |
c.currentStatus.Store(false) | |
} | |
}).Catch(func(e try.E) { | |
helper.SentryError(e) | |
}) | |
} | |
}() | |
} | |
// Go into reconnect loop when | |
// c.done is passed non nil values | |
if <-c.done != nil { | |
c.currentStatus.Store(false) | |
retryTime := 1 | |
for { | |
deliveries, err = c.ReConnect(queue, routingKey, retryTime) | |
if err != nil { | |
helper.FailOnError(err, "Reconnecting Error") | |
retryTime += 1 | |
} else { | |
break | |
} | |
} | |
} | |
beego.Info("Reconnected!!!") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment