Created
February 1, 2021 07:23
-
-
Save omerkaya1/549ff0130554bd8f29e7f08b255635e5 to your computer and use it in GitHub Desktop.
An interface and an implementation of the message queue service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mq | |
import ( | |
"context" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/streadway/amqp" | |
) | |
type ( | |
// Service is the general interface that all MQ service implementations must satisfy | |
// In case we ever decide to use another MQ server | |
Service interface { | |
Start(ctx context.Context, errChan chan error) error | |
Publish(body []byte, appID, queueName, msgType string) error | |
Consume() <-chan Message | |
} | |
// RabbitMQ is an object that is used to communicate with the RabbitMQ server | |
// TODO(o.kaya): according to the documentation, we should separate connections for publishing and consuming | |
// messages in order to avoid TCP push-backs. Let's keep that in mind until the issue becomes noticeable. | |
RabbitMQ struct { | |
conn *amqp.Connection | |
cfg config | |
channel *amqp.Channel | |
closure chan *amqp.Error | |
blocking chan amqp.Blocking | |
inputMessages chan amqp.Delivery | |
outputMessages chan Message | |
queue chan Message | |
isBlocked bool | |
} | |
// config is the interface used to configure the MQ service | |
config interface { | |
GetConnectionURI() string | |
GetServiceNames() []string | |
} | |
// Message is the structure that gets sent and received by the MQ service to and from the MQ server | |
// It abstracts the main info on the message in the way that is convenient for consumption by the MQ service | |
Message struct { | |
Body []byte | |
AppID string | |
Type string | |
QueueName string // May be redundant, if we end up using the AppID as a queue name | |
} | |
) | |
const ( | |
maxBufferSize = 10 // Mainly used for buffered channels | |
maxRetry = 10 // The number of retries the service will perform to re-connect | |
retryTimeout = time.Second * maxRetry // To avoid false positives, the recommended retry interval is 5 seconds | |
) | |
// ErrFatalConnFailure is an error that signals the complete loss of connection between the MQ Service and the MQ server | |
var ErrFatalConnFailure = errors.New("fatal MQ connection failure") | |
// NewRabbitMQ returns a new instance of RabbitMQ to the caller | |
// It only initialises the connection to the MQ server and a channel of messages | |
func NewRabbitMQ(cfg config) *RabbitMQ { | |
return &RabbitMQ{ | |
cfg: cfg, | |
inputMessages: make(chan amqp.Delivery), | |
outputMessages: make(chan Message, maxBufferSize), | |
queue: make(chan Message, maxBufferSize), | |
} | |
} | |
// Start initialises the connection to the MQ server and keeps it alive throughout the whole lifecycle of the programme | |
// It also re-establishes the connection if it receives the notification that the connection is closed | |
func (rmq *RabbitMQ) Start(ctx context.Context, errChan chan error) error { | |
// Deploy the main work cycle | |
go func(errChannel chan error) { | |
// Ensure that we release all resources when done, which in our case, may happen if: | |
// - we receive the context cancellation; | |
// - there was a critical connection failure. | |
defer func() { | |
close(rmq.inputMessages) | |
close(rmq.outputMessages) | |
close(rmq.queue) | |
var err error | |
// Close the channel | |
if rmq.channel != nil { | |
err = rmq.channel.Close() | |
if err != nil { | |
errChan <- errors.Wrap(err, "failed to close the MQ channel") | |
} | |
} | |
// Close the connection | |
if rmq.conn != nil { | |
err = rmq.conn.Close() | |
if err != nil { | |
errChan <- errors.Wrap(err, "failed to close the MQ connection") | |
} | |
} | |
}() | |
for { | |
select { | |
// Occurs once, when the programme receives interrupt from the top level | |
// We assume that the closing is intentional or enforced thus releasing the associated resources and exiting | |
case <-ctx.Done(): | |
return | |
// The closure channel will be re-initialised every time the connection gets lost for some reason | |
case err := <-rmq.closure: | |
if err != nil { | |
errChannel <- errors.Wrap(err, "mq service: closure signal received") | |
} | |
// Block the message queue and try to reconnect | |
rmq.isBlocked = true | |
for i := 1; i <= maxRetry; i++ { | |
if err := rmq.connect(); err != nil { | |
errChannel <- errors.Wrapf(err, "mq service: %d connection attempt failed", i) | |
time.Sleep(retryTimeout) | |
continue | |
} | |
rmq.isBlocked = false | |
} | |
if rmq.isBlocked { | |
errChannel <- ErrFatalConnFailure | |
return | |
} | |
// This signal indicates that the connection is blocked; we cannot push messages to the queue and we need | |
// to pause the execution until the connection is ready to consume messages again | |
case block := <-rmq.blocking: | |
if block.Active { | |
errChannel <- errors.Errorf("mq service: received blocked notification: %s", block.Reason) | |
rmq.isBlocked = true | |
} else { | |
rmq.isBlocked = false | |
} | |
// Send a message to the MQ server | |
case msg := <-rmq.queue: | |
err := rmq.channel.Publish("", msg.QueueName, false, false, amqp.Publishing{ | |
ContentType: "application/json", | |
Timestamp: time.Now(), | |
AppId: msg.AppID, | |
Body: msg.Body, | |
}) | |
if err != nil { | |
errChannel <- errors.Wrap(err, "mq service: failed to deliver the message") | |
} | |
// Receive messages and push them to the output channel | |
case msg := <-rmq.inputMessages: | |
rmq.outputMessages <- Message{ | |
Body: msg.Body, | |
AppID: msg.AppId, | |
Type: msg.Type, | |
} | |
// This case provides a non-blocking operation for the loop | |
default: | |
} | |
} | |
}(errChan) | |
return rmq.connect() | |
} | |
// Publish enqueues the message for sending to the MQ server | |
// It either performs a non-blocking send to the internal message queue or reports an error if either the connection is | |
// blocked or the queue is not ready to receive messages. | |
func (rmq RabbitMQ) Publish(body []byte, appID, qn, t string) error { | |
if rmq.isBlocked { | |
return errors.New("mq service: failed to push a message to the queue: connection is blocked") | |
} | |
select { | |
case rmq.queue <- Message{body, qn, appID, t}: | |
return nil | |
default: | |
return errors.New("mq service: failed to push a message to the queue: channel is not ready to receive messages") | |
} | |
} | |
// Consume returns a generalised channel of Message items to its caller | |
// All messages sent to SYR combined in a single message channel | |
func (rmq RabbitMQ) Consume() <-chan Message { | |
return rmq.outputMessages | |
} | |
// connect establishes a connection to the RabbitMQ server | |
func (rmq *RabbitMQ) connect() error { | |
var err error | |
rmq.conn, err = amqp.Dial(rmq.cfg.GetConnectionURI()) | |
if err != nil { | |
return errors.Wrap(err, "mq service: failed to establish the connection") | |
} | |
// Initialise a general RabbitMQ server channel to fetch messages | |
rmq.channel, err = rmq.conn.Channel() | |
if err != nil { | |
return errors.Wrap(err, "mq service: failed to initialise the message channel") | |
} | |
return rmq.configure() | |
} | |
// configure should be called whenever the service is started/re-started | |
// It initialises notification channels, registers message queues and message channels | |
func (rmq *RabbitMQ) configure() error { | |
// Register the closure notification channel | |
rmq.closure = rmq.conn.NotifyClose(make(chan *amqp.Error)) | |
// Register the blocking channel that will report the blocking conditions on the RabbitMQ server | |
rmq.blocking = rmq.conn.NotifyBlocked(make(chan amqp.Blocking)) | |
// Define queues | |
if err := rmq.configureQueues(); err != nil { | |
return err | |
} | |
// Register consumers | |
return rmq.configureConsumers() | |
} | |
// configureQueues declares message queues that will be transmitted and consumed from | |
// This part is purely for configuration purposes and should not be changed without | |
func (rmq *RabbitMQ) configureQueues() error { | |
var queues = rmq.cfg.GetServiceNames() | |
var err error | |
for i := range queues { | |
if _, err = rmq.channel.QueueDeclare(queues[i], true, false, false, false, nil); err != nil { | |
return errors.Wrapf(err, "mq service: failed to register '%s' queue", queues[i]) | |
} | |
} | |
return nil | |
} | |
// configureConsumers deploys a pool of workers (MQ consumers) and aggregates their output into a single channel | |
// The workers stop once the | |
func (rmq *RabbitMQ) configureConsumers() error { | |
var queues = rmq.cfg.GetServiceNames() | |
for i := range queues { | |
msgs, err := rmq.channel.Consume(queues[i], "", true, false, false, false, nil) | |
if err != nil { | |
return errors.Wrapf(err, "mq service: failed to register '%s' consumer", queues[i]) | |
} | |
// Deploy a routine that will merge all its messages into one channel | |
go func(messages <-chan amqp.Delivery) { | |
for msg := range messages { | |
rmq.inputMessages <- msg | |
} | |
}(msgs) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment