Created
August 8, 2016 20:19
-
-
Save adambom/a3828487044243301b9ee084419ccfdd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Pacakge msgme implements a lightweight library for publishing messages to and | |
// consuming messages from an amqp-compatible message queue. | |
package msgme | |
import ( | |
"github.com/streadway/amqp" | |
) | |
type Broker struct { | |
Key string | |
Queue string | |
Topic string | |
URL string | |
conn *amqp.Connection | |
channel *amqp.Channel | |
} | |
// Close tears down the connection to the amqp host. | |
func (b *Broker) Close() { | |
b.channel.Close() | |
b.conn.Close() | |
} | |
// Emit will publish a message to the broker's routing key. | |
func (b *Broker) Emit(msg string) error { | |
return b.channel.Publish(b.Topic, b.Key, false, false, amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: []byte(msg), | |
}) | |
} | |
/* | |
New initializes and returns a pointer to Broker. It will connect to the amqp | |
host at the URL specified, declare a topic exchange with the specified topic | |
name, and declare a queue, binding it to the topic. | |
To bind to multiple routing keys, use * to designate wildcards. | |
*/ | |
func New(url, queue, topic, key string) (*Broker, error) { | |
b := &Broker{ | |
Key: key, | |
Queue: queue, | |
Topic: topic, | |
URL: url, | |
} | |
conn, err := amqp.Dial(b.URL) | |
if err != nil { | |
return nil, err | |
} | |
b.conn = conn | |
ch, err := conn.Channel() | |
if err != nil { | |
return nil, err | |
} | |
b.channel = ch | |
err = ch.ExchangeDeclare(b.Topic, "topic", true, false, false, false, nil) | |
if err != nil { | |
return nil, err | |
} | |
_, err = ch.QueueDeclare(b.Queue, false, false, false, false, nil) | |
if err != nil { | |
return nil, err | |
} | |
err = b.channel.QueueBind(b.Queue, b.Key, b.Topic, false, nil) | |
return b, err | |
} | |
// Recv subscribes the caller to the broker's routing key. | |
func (b *Broker) Recv() (<-chan amqp.Delivery, error) { | |
return b.channel.Consume(b.Queue, "", true, false, false, false, nil) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment