Skip to content

Instantly share code, notes, and snippets.

@ilyabrin
Last active March 22, 2020 22:11
Show Gist options
  • Save ilyabrin/c83713f04a267f50c60caa0c28505a16 to your computer and use it in GitHub Desktop.
Save ilyabrin/c83713f04a267f50c60caa0c28505a16 to your computer and use it in GitHub Desktop.
RabbitMQ consumer / producer example
// RabbitMQ: consumer + producer (in one file, yes)
// docker run --detach --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
// TODO: docker-compose.yml
package main
import (
"fmt"
"os"
"github.com/streadway/amqp"
)
func main() {
// Get the connection string from the environment variable
url := os.Getenv("AMQP_URL")
//If it doesnt exist, use the default connection string
if url == "" {
url = "amqp://guest:guest@localhost:5672"
}
// Connect to the rabbitMQ instance
connection, err := amqp.Dial(url)
if err != nil {
panic("could not establish connection with RabbitMQ:" + err.Error())
}
// Create a channel from the connection. We'll use channels to access the data in the queue rather than the
// connection itself
channel, err := connection.Channel()
if err != nil {
panic("could not open RabbitMQ channel:" + err.Error())
}
// We create an exahange that will bind to the queue to send and receive messages
err = channel.ExchangeDeclare("events", "topic", true, false, false, false, nil)
if err != nil {
panic(err)
}
// We create a message to be sent to the queue.
// It has to be an instance of the aqmp publishing struct
message := amqp.Publishing{
Body: []byte("Hello World"),
}
// We publish the message to the exahange we created earlier
err = channel.Publish("events", "random-key", false, false, message)
if err != nil {
panic("error publishing a message to the queue:" + err.Error())
}
// We create a queue named Test
_, err = channel.QueueDeclare("test", true, false, false, false, nil)
if err != nil {
panic("error declaring the queue: " + err.Error())
}
// We bind the queue to the exchange to send and receive data from the queue
err = channel.QueueBind("test", "#", "events", false, nil)
if err != nil {
panic("error binding to the queue: " + err.Error())
}
// We consume data from the queue named Test using the channel we created in go.
msgs, err := channel.Consume("test", "", false, false, false, false, nil)
if err != nil {
panic("error consuming the queue: " + err.Error())
}
// We loop through the messages in the queue and print them in the console.
// The msgs will be a go channel, not an amqp channel
for msg := range msgs {
fmt.Println("message received: " + string(msg.Body))
msg.Ack(false)
}
// We close the connection after the operation has completed.
defer connection.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment