Last active
December 1, 2020 08:55
-
-
Save ildarusmanov/d2132a915506ca23bde46d5efeb1a2c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// From sources for https://watermill.io/docs/getting-started/ | |
package main | |
import ( | |
"context" | |
"log" | |
"time" | |
"github.com/ThreeDotsLabs/watermill" | |
"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp" | |
"github.com/ThreeDotsLabs/watermill/message" | |
) | |
var amqpURI = "amqp://guest:guest@rabbitmq:5672/" | |
func main() { | |
time.Sleep(30 * time.Second) | |
amqpConfig := amqp.NewNonDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("q1")) | |
subscriber, err := amqp.NewSubscriber( | |
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html | |
// It works as a simple queue. | |
// | |
// If you want to implement a Pub/Sub style service instead, check | |
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups | |
amqpConfig, | |
watermill.NewStdLogger(false, false), | |
) | |
if err != nil { | |
panic(err) | |
} | |
amqpConfig2 := amqp.NewNonDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("q2")) | |
subscriber2, err := amqp.NewSubscriber( | |
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html | |
// It works as a simple queue. | |
// | |
// If you want to implement a Pub/Sub style service instead, check | |
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups | |
amqpConfig2, | |
watermill.NewStdLogger(false, false), | |
) | |
if err != nil { | |
panic(err) | |
} | |
messages, err := subscriber.Subscribe(context.Background(), "example.topic1") | |
if err != nil { | |
panic(err) | |
} | |
messages2, err := subscriber2.Subscribe(context.Background(), "example.topic1") | |
if err != nil { | |
panic(err) | |
} | |
go newProcess("sub1")(messages) | |
go newProcess("sub2")(messages2) | |
amqpConfig3 := amqp.NewNonDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicName) | |
publisher, err := amqp.NewPublisher(amqpConfig3, watermill.NewStdLogger(false, false)) | |
if err != nil { | |
panic(err) | |
} | |
publishMessages(publisher) | |
} | |
func publishMessages(publisher message.Publisher) { | |
for { | |
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!!!")) | |
if err := publisher.Publish("example.topic1", msg); err != nil { | |
panic(err) | |
} | |
time.Sleep(time.Second) | |
} | |
} | |
func newProcess(id string) func(messages <-chan *message.Message) { | |
return func(messages <-chan *message.Message) { | |
for msg := range messages { | |
log.Printf("%s received message: %s, payload: %s", id, msg.UUID, string(msg.Payload)) | |
// we need to Acknowledge that we received and processed the message, | |
// otherwise, it will be resent over and over again. | |
msg.Ack() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment