Created
October 12, 2018 11:34
-
-
Save 2garryn/c346847c810aa7a2644471ecccec6c1a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package erabbit | |
import ( | |
"sync" | |
"vkanalyze/libs/config" | |
"vkanalyze/libs/log" | |
"vkanalyze/libs/vkerr" | |
"github.com/streadway/amqp" | |
) | |
var instance *mqPublisher | |
var once sync.Once | |
func GetPublisher() *mqPublisher { | |
once.Do(func() { | |
instance = &mqPublisher{} | |
}) | |
return instance | |
} | |
type message struct { | |
Name string | |
RoutingKey string | |
Data []byte | |
ResponseCh chan<- int | |
} | |
type mqPublisher struct { | |
conn *amqp.Connection | |
exchNames []string | |
nWorkers int | |
recv chan message | |
} | |
func (pub *mqPublisher) Init(exchangeNames []string, nw int) { | |
url := config.Get().RabbitURL | |
pub.conn = connect(url) | |
pub.exchNames = exchangeNames | |
pub.nWorkers = nw | |
for i := 0; i < nw; i++ { | |
pub.startChannel() | |
} | |
} | |
func (pub mqPublisher) Publish(name string, rk string, data []byte) error { | |
resChannel := make(chan int) | |
msg := message{ | |
Name: name, | |
RoutingKey: rk, | |
Data: data, | |
ResponseCh: resChannel, | |
} | |
pub.recv <- msg | |
r := <-resChannel | |
if r != OK { | |
return vkerr.E(vkerr.InternalError, "Something goes wrong with rabbit") | |
} | |
return nil | |
} | |
func (pub mqPublisher) startChannel() { | |
go func() { | |
ch := initChannel(pub.conn) | |
for _, name := range pub.exchNames { | |
declareExchange(name, ch) | |
} | |
confirm := ch.NotifyPublish(make(chan amqp.Confirmation)) | |
for t := range pub.recv { | |
pub.publish(ch, confirm, t) | |
} | |
}() | |
} | |
func (pub mqPublisher) publish(ch *amqp.Channel, confirm <-chan amqp.Confirmation, msg message) { | |
err := ch.Publish( | |
msg.Name, // exchange | |
msg.RoutingKey, // routing key | |
true, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "application/json", | |
Body: msg.Data, | |
}) | |
if err != nil { | |
log.Log.Error("Can't publish to amqp channel ", err, msg) | |
msg.ResponseCh <- NOTOK | |
return | |
} | |
cf := <-confirm | |
if cf.Ack { | |
msg.ResponseCh <- OK | |
} else { | |
msg.ResponseCh <- NOTOK | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment