-
-
Save qloog/a5ad99a38be575ffeaa9a6a239419fd3 to your computer and use it in GitHub Desktop.
rabbitmq producer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rabbitmq | |
import ( | |
"log" | |
"time" | |
"github.com/pborman/uuid" | |
"github.com/streadway/amqp" | |
) | |
type Producer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
connNotify chan *amqp.Error | |
channelNotify chan *amqp.Error | |
quit chan struct{} | |
addr string | |
exchange string | |
routingKey string | |
} | |
func NewProducer(addr, exchange string) *Producer { | |
p := &Producer{ | |
addr: addr, | |
exchange: exchange, | |
routingKey: "", | |
quit: make(chan struct{}), | |
} | |
return p | |
} | |
func (p *Producer) Start() error { | |
if err := p.Run(); err != nil { | |
return err | |
} | |
go p.ReConnect() | |
return nil | |
} | |
func (p *Producer) Stop() { | |
close(p.quit) | |
if !p.conn.IsClosed() { | |
if err := p.conn.Close(); err != nil { | |
log.Println("rabbitmq producer - connection close failed: ", err) | |
} | |
} | |
} | |
func (p *Producer) Run() error { | |
var err error | |
if p.conn, err = amqp.Dial(p.addr); err != nil { | |
return err | |
} | |
if p.channel, err = p.conn.Channel(); err != nil { | |
p.conn.Close() | |
return err | |
} | |
p.connNotify = p.conn.NotifyClose(make(chan *amqp.Error)) | |
p.channelNotify = p.channel.NotifyClose(make(chan *amqp.Error)) | |
return err | |
} | |
func (p *Producer) ReConnect() { | |
for { | |
select { | |
case err := <-p.connNotify: | |
if err != nil { | |
log.Println("rabbitmq producer - connection NotifyClose: ", err) | |
} | |
case err := <-p.channelNotify: | |
if err != nil { | |
log.Println("rabbitmq producer - channel NotifyClose: ", err) | |
} | |
case <-p.quit: | |
return | |
} | |
// backstop | |
if !p.conn.IsClosed() { | |
if err := p.conn.Close(); err != nil { | |
log.Println("rabbitmq producer - connection close failed: ", err) | |
} | |
} | |
// IMPORTANT: 必须清空 Notify,否则死连接不会释放 | |
for err := range p.channelNotify { | |
log.Println(err) | |
} | |
for err := range p.connNotify { | |
log.Println(err) | |
} | |
quit: | |
for { | |
select { | |
case <-p.quit: | |
return | |
default: | |
log.Println("rabbitmq producer - reconnect") | |
if err := p.Run(); err != nil { | |
log.Println("rabbitmq producer - failCheck: ", err) | |
// sleep 5s reconnect | |
time.Sleep(time.Second * 5) | |
continue | |
} | |
break quit | |
} | |
} | |
} | |
} | |
func (p *Producer) Publish(msg []byte) error { | |
return p.channel.Publish( | |
p.exchange, // exchange | |
p.routingKey, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
MessageId: uuid.New(), | |
Type: "", | |
Body: msg, | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment