Created
June 11, 2014 11:50
-
-
Save tedkulp/d7e647ba4daaa872b21b to your computer and use it in GitHub Desktop.
AMQP echo server in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/streadway/amqp" | |
"encoding/json" | |
"log" | |
"os" | |
"fmt" | |
"time" | |
) | |
type Message struct { | |
Message string `json:"message"` | |
} | |
func failOnError(err error, msg string) { | |
if err != nil { | |
log.Fatalf("%s: %s", msg, err) | |
panic(fmt.Sprintf("%s: %s", msg, err)) | |
} | |
} | |
func publish(ch *amqp.Channel, d amqp.Delivery, body []byte) error { | |
return ch.Publish( | |
d.Exchange, // exchange | |
d.ReplyTo, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
CorrelationId: d.CorrelationId, | |
ContentType: "application/json", | |
Body: body, // []byte(body), | |
}) | |
} | |
func main() { | |
conn, err := amqp.Dial("amqp://admin:[email protected]:5672/") | |
failOnError(err, "Failed to connect to RabbitMQ") | |
defer conn.Close() | |
ch, err := conn.Channel() | |
failOnError(err, "Failed to open a channel") | |
defer ch.Close() | |
err = ch.ExchangeDeclare( | |
"rpc_exchange", // name | |
"topic", // kind | |
false, // durable | |
false, // delete when unused | |
false, // exclusive | |
false, // noWait | |
nil, // arguments | |
) | |
failOnError(err, "Failed to declare an exchange") | |
q, err := ch.QueueDeclare( | |
"echo.*", // name | |
false, // durable | |
true, // delete when unused | |
false, // exclusive | |
false, // noWait | |
nil, // arguments | |
) | |
failOnError(err, "Failed to declare a queue") | |
ch.Qos(3, 0, false) | |
ch.QueueBind("echo.*", "echo.*", "rpc_exchange", false, nil) | |
msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil) | |
failOnError(err, "Failed to register a consumer") | |
done := make(chan bool) | |
go func() { | |
var m Message | |
for d := range msgs { | |
time.Sleep(time.Millisecond) | |
switch d.RoutingKey { | |
case "echo.go": | |
log.Printf("Received a message: %s", d.Body) | |
log.Printf("ReplyTo: %s", d.ReplyTo) | |
log.Printf("CorrelationId: %s", d.CorrelationId) | |
log.Printf("RoutingKey: %s", d.RoutingKey) | |
log.Printf("Exchange: %s", d.Exchange) | |
log.Printf("ContentType: %s", d.ContentType) | |
log.Printf("ContentEncoding: %s", d.ContentEncoding) | |
d.Ack(false) | |
failOnError(json.Unmarshal(d.Body, &m), "Failed to unmarshal message") | |
f := map[string]interface{}{ | |
"echo": true, | |
"message": m.Message, | |
} | |
body, err := json.Marshal(f) | |
failOnError(err, "Failed to marshal message") | |
failOnError(publish(ch, d, body), "Failed to publish a message") | |
log.Printf(" [x] Sent %s", string(body)) | |
default: | |
d.Ack(false) | |
} | |
// done <- true | |
time.Sleep(time.Millisecond) | |
} | |
}() | |
log.Printf(" [*] Waiting for messages. To exit press CTRL+C") | |
<-done // Never gets called, but will hang indefinitely for the goroutine to run | |
log.Printf("Done") | |
os.Exit(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment