Created
May 11, 2021 17:25
-
-
Save Tinitto/d2c24b14486618398c9b4d4dbccb4a86 to your computer and use it in GitHub Desktop.
How to Connect to an AMQP 1.0 Topic not Queue in Golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* In an attempt to use the https://github.com/Azure/go-amqp/ to connect to an AMQP topic, I was using the sample code | |
* on the README but kept getting an error like: | |
* "Failed creating receiver link *Error{Condition: amqp:unauthorized-access, Description: User qbtzf4r2sqkimdl is not authorized to read from: queue://topic..." | |
* To connect to an AMQP 1.0 topic, the LinkSourceAddress or the LinkTargetAddres has to be of the form "topic://..." | |
*/ | |
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"time" | |
"github.com/Azure/go-amqp" | |
) | |
func main() { | |
// Create client | |
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net", | |
amqp.ConnSASLPlain("access-key-name", "access-key"), | |
) | |
if err != nil { | |
log.Fatal("Dialing AMQP server:", err) | |
} | |
defer client.Close() | |
// Open a session | |
session, err := client.NewSession() | |
if err != nil { | |
log.Fatal("Creating AMQP session:", err) | |
} | |
ctx := context.Background() | |
// Send a message | |
{ | |
// Create a sender | |
sender, err := session.NewSender( | |
amqp.LinkTargetAddress("topic://topic-name"), | |
) | |
if err != nil { | |
log.Fatal("Creating sender link:", err) | |
} | |
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) | |
// Send message | |
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!"))) | |
if err != nil { | |
log.Fatal("Sending message:", err) | |
} | |
sender.Close(ctx) | |
cancel() | |
} | |
// Continuously read messages | |
{ | |
// Create a receiver | |
receiver, err := session.NewReceiver( | |
amqp.LinkSourceAddress("topic://topic-name"), | |
amqp.LinkCredit(10), | |
) | |
if err != nil { | |
log.Fatal("Creating receiver link:", err) | |
} | |
defer func() { | |
ctx, cancel := context.WithTimeout(ctx, 1*time.Second) | |
receiver.Close(ctx) | |
cancel() | |
}() | |
for { | |
// Receive next message | |
msg, err := receiver.Receive(ctx) | |
if err != nil { | |
log.Fatal("Reading message from AMQP:", err) | |
} | |
// Accept message | |
msg.Accept(context.Background()) | |
fmt.Printf("Message received: %s\n", msg.GetData()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment