Created
January 20, 2020 04:26
-
-
Save dselans/77dc432c7d57aa94bc7c0ca67070f539 to your computer and use it in GitHub Desktop.
RabbitMQ header exchange publisher and consumer example in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Quick example of using the headers exchange with the streadway/amqp lib. | |
// | |
// NOTE: I couldn't find any examples of this anywhere, so maybe this will help | |
// someone else. | |
// | |
// Run consumer in one terminal: $ go run main.go | |
// Run publisher in another terminal: $ go run main.go -action publisher | |
// | |
// The consumer should have received a message. | |
// | |
// To see that header routing is working as expected, specify a different header | |
// in the Publish call -- the message should no longer be received by the consumer. | |
// | |
package main | |
import ( | |
"flag" | |
"fmt" | |
"log" | |
"github.com/pkg/errors" | |
"github.com/streadway/amqp" | |
) | |
const ( | |
ConsumerAction = "consumer" | |
PublisherAction = "publisher" | |
) | |
var ( | |
action = flag.String("action", ConsumerAction, "publisher or consumer") | |
rabbitURL = flag.String("url", "amqp://localhost", "rabbitmq url") | |
exchangeName = flag.String("headers", "headers", "name of the exchange") | |
queueName = flag.String("queue", "header-queue", "name of the queue (only needed for consumer)") | |
) | |
type Options struct { | |
URL string | |
ExchangeName string | |
ExchangeType string | |
QueueName string | |
RoutingKey string | |
} | |
func main() { | |
flag.Parse() | |
ch, err := connect(*action, &Options{ | |
*rabbitURL, | |
*exchangeName, | |
"headers", | |
*queueName, | |
"", | |
}) | |
if err != nil { | |
log.Fatalf("unable to connect to rabbitmq: ") | |
} | |
switch *action { | |
case PublisherAction: | |
if err := ch.Publish( | |
*exchangeName, | |
"", // routing key is not used w/ headers exchange | |
false, | |
false, | |
amqp.Publishing{ | |
Headers: amqp.Table{ | |
"type": "foo", | |
}, | |
Body: []byte("test message contents"), | |
}, | |
); err != nil { | |
log.Fatalf("unable to publish message: %s", err) | |
} | |
fmt.Println("Published message") | |
case ConsumerAction: | |
rabbitChan, err := ch.Consume( | |
"transform", | |
"consume/main.go", | |
true, | |
false, | |
false, | |
false, | |
nil, | |
) | |
if err != nil { | |
log.Fatalf("unable to start consumption of messages: %s", err) | |
} | |
fmt.Println("Running in consumer mode") | |
for { | |
msg := <-rabbitChan | |
fmt.Printf("Received a message w/ body: %+v Headers: %+v\n", string(msg.Body), msg.Headers) | |
} | |
default: | |
log.Fatalf("unrecognized action '%s'", *action) | |
} | |
} | |
func connect(action string, opts *Options) (*amqp.Channel, error) { | |
ac, err := amqp.Dial(opts.URL) | |
if err != nil { | |
return nil, err | |
} | |
ch, err := ac.Channel() | |
if err != nil { | |
return nil, errors.Wrap(err, "Channel instantiation failure") | |
} | |
if err := ch.ExchangeDeclare( | |
opts.ExchangeName, | |
opts.ExchangeType, | |
true, | |
false, | |
false, | |
false, | |
nil, | |
); err != nil { | |
return nil, errors.Wrap(err, "unable to declare exchange") | |
} | |
if action == ConsumerAction { | |
if _, err = ch.QueueDeclare( | |
opts.QueueName, | |
true, | |
false, | |
false, | |
false, | |
nil, | |
); err != nil { | |
return nil, err | |
} | |
if err := ch.QueueBind( | |
opts.QueueName, | |
opts.RoutingKey, | |
opts.ExchangeName, | |
false, | |
amqp.Table{ | |
"x-match": "all", | |
"type": "foo", | |
}, | |
); err != nil { | |
return nil, errors.Wrap(err, "unable to bind queue") | |
} | |
} | |
return ch, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment