Skip to content

Instantly share code, notes, and snippets.

@2garryn
Created October 12, 2018 11:34
Show Gist options
  • Save 2garryn/c346847c810aa7a2644471ecccec6c1a to your computer and use it in GitHub Desktop.
Save 2garryn/c346847c810aa7a2644471ecccec6c1a to your computer and use it in GitHub Desktop.
package erabbit
import (
"sync"
"vkanalyze/libs/config"
"vkanalyze/libs/log"
"vkanalyze/libs/vkerr"
"github.com/streadway/amqp"
)
var instance *mqPublisher
var once sync.Once
func GetPublisher() *mqPublisher {
once.Do(func() {
instance = &mqPublisher{}
})
return instance
}
type message struct {
Name string
RoutingKey string
Data []byte
ResponseCh chan<- int
}
type mqPublisher struct {
conn *amqp.Connection
exchNames []string
nWorkers int
recv chan message
}
func (pub *mqPublisher) Init(exchangeNames []string, nw int) {
url := config.Get().RabbitURL
pub.conn = connect(url)
pub.exchNames = exchangeNames
pub.nWorkers = nw
for i := 0; i < nw; i++ {
pub.startChannel()
}
}
func (pub mqPublisher) Publish(name string, rk string, data []byte) error {
resChannel := make(chan int)
msg := message{
Name: name,
RoutingKey: rk,
Data: data,
ResponseCh: resChannel,
}
pub.recv <- msg
r := <-resChannel
if r != OK {
return vkerr.E(vkerr.InternalError, "Something goes wrong with rabbit")
}
return nil
}
func (pub mqPublisher) startChannel() {
go func() {
ch := initChannel(pub.conn)
for _, name := range pub.exchNames {
declareExchange(name, ch)
}
confirm := ch.NotifyPublish(make(chan amqp.Confirmation))
for t := range pub.recv {
pub.publish(ch, confirm, t)
}
}()
}
func (pub mqPublisher) publish(ch *amqp.Channel, confirm <-chan amqp.Confirmation, msg message) {
err := ch.Publish(
msg.Name, // exchange
msg.RoutingKey, // routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: msg.Data,
})
if err != nil {
log.Log.Error("Can't publish to amqp channel ", err, msg)
msg.ResponseCh <- NOTOK
return
}
cf := <-confirm
if cf.Ack {
msg.ResponseCh <- OK
} else {
msg.ResponseCh <- NOTOK
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment