Last active
January 27, 2025 11:32
-
-
Save spksoft/59d1cb62c664d818a2500dab1e873761 to your computer and use it in GitHub Desktop.
Example Golang worker with rabbitMQ graceful shutdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/streadway/amqp" | |
) | |
// newConsumer is a function create a rabbitMQ consumer | |
func newConsumer(connectionString, queueName, consumerName string, fetchCount int) (connection *amqp.Connection, channel *amqp.Channel, msgs <-chan amqp.Delivery) { | |
connection, _ = amqp.Dial(connectionString) | |
channel, _ = connection.Channel() | |
channel.Qos(fetchCount, 0, false) | |
msgs, _ = channel.Consume( | |
queueName, // queue | |
consumerName, // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // no-local | |
false, // no-wait | |
nil, // args | |
) | |
return | |
} | |
func worker(msgs <-chan amqp.Delivery, done chan bool) { | |
for m := range msgs { | |
body := string(m.Body) | |
log.Printf("Processing data %+v\n", body) | |
time.Sleep(5 * time.Second) | |
log.Printf("Processing data %+v done\n", body) | |
m.Ack(false) | |
log.Printf("Data %+v acked\n", body) | |
} | |
done <- true | |
} | |
func main() { | |
defer log.Println("Program stopped successful") | |
url := flag.String("url", "", "eg. amqp://root:toor@localhost:5672/") | |
queue := flag.String("queue", "", "eg. test-queue") | |
name := flag.String("name", "", "eg. consumer-1") | |
fetchSize := flag.Int("size", 1, "eg. 20") | |
flag.Parse() | |
log.Printf("Connecting to %s queue %s fetch-size %d\n", *url, *queue, *fetchSize) | |
connection, channel, msgs := newConsumer(*url, *queue, *name, *fetchSize) | |
log.Printf("Consumer %s is subscribing queue %s\n", *name, *queue) | |
defer connection.Close() | |
defer channel.Close() | |
defer log.Println("Closing qeueu channel and connection") | |
done := make(chan bool) | |
go worker(msgs, done) | |
exit := make(chan os.Signal, 1) | |
signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) | |
// Wait for OS exit signal | |
<-exit | |
log.Println("Got exit signal") | |
// Stop recieving message from queue | |
channel.Cancel(*name, false) | |
log.Println("Stopped receiving message from queue") | |
// Wait for worker procrss recieved message | |
log.Println("Wait for worker procrss recieved message") | |
<-done | |
log.Println("Woker done") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment