Last active
March 22, 2020 22:11
-
-
Save ilyabrin/c83713f04a267f50c60caa0c28505a16 to your computer and use it in GitHub Desktop.
RabbitMQ consumer / producer example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// RabbitMQ: consumer + producer (in one file, yes) | |
// docker run --detach --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management | |
// TODO: docker-compose.yml | |
package main | |
import ( | |
"fmt" | |
"os" | |
"github.com/streadway/amqp" | |
) | |
func main() { | |
// Get the connection string from the environment variable | |
url := os.Getenv("AMQP_URL") | |
//If it doesnt exist, use the default connection string | |
if url == "" { | |
url = "amqp://guest:guest@localhost:5672" | |
} | |
// Connect to the rabbitMQ instance | |
connection, err := amqp.Dial(url) | |
if err != nil { | |
panic("could not establish connection with RabbitMQ:" + err.Error()) | |
} | |
// Create a channel from the connection. We'll use channels to access the data in the queue rather than the | |
// connection itself | |
channel, err := connection.Channel() | |
if err != nil { | |
panic("could not open RabbitMQ channel:" + err.Error()) | |
} | |
// We create an exahange that will bind to the queue to send and receive messages | |
err = channel.ExchangeDeclare("events", "topic", true, false, false, false, nil) | |
if err != nil { | |
panic(err) | |
} | |
// We create a message to be sent to the queue. | |
// It has to be an instance of the aqmp publishing struct | |
message := amqp.Publishing{ | |
Body: []byte("Hello World"), | |
} | |
// We publish the message to the exahange we created earlier | |
err = channel.Publish("events", "random-key", false, false, message) | |
if err != nil { | |
panic("error publishing a message to the queue:" + err.Error()) | |
} | |
// We create a queue named Test | |
_, err = channel.QueueDeclare("test", true, false, false, false, nil) | |
if err != nil { | |
panic("error declaring the queue: " + err.Error()) | |
} | |
// We bind the queue to the exchange to send and receive data from the queue | |
err = channel.QueueBind("test", "#", "events", false, nil) | |
if err != nil { | |
panic("error binding to the queue: " + err.Error()) | |
} | |
// We consume data from the queue named Test using the channel we created in go. | |
msgs, err := channel.Consume("test", "", false, false, false, false, nil) | |
if err != nil { | |
panic("error consuming the queue: " + err.Error()) | |
} | |
// We loop through the messages in the queue and print them in the console. | |
// The msgs will be a go channel, not an amqp channel | |
for msg := range msgs { | |
fmt.Println("message received: " + string(msg.Body)) | |
msg.Ack(false) | |
} | |
// We close the connection after the operation has completed. | |
defer connection.Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment