Skip to content

Instantly share code, notes, and snippets.

@inesusvet
Created February 23, 2018 15:27
Show Gist options
  • Save inesusvet/0ef50c8dd02910a24f43862c55a2eac7 to your computer and use it in GitHub Desktop.
Save inesusvet/0ef50c8dd02910a24f43862c55a2eac7 to your computer and use it in GitHub Desktop.
Simple amqp producer script which is tolerant to connection errors
package main
import (
"encoding/json"
"flag"
"fmt"
linuxproc "github.com/c9s/goprocinfo/linux"
"github.com/streadway/amqp"
"log"
"os"
"strconv"
"time"
)
type Configuration struct {
Hostname string
Username string
Password string
QueueName string
Vhost string
}
func ReadConfig(filename string) Configuration {
file, err := os.Open(filename)
if err != nil {
log.Fatal("Can't read config from file:", err)
}
defer file.Close()
decoder := json.NewDecoder(file)
Config := Configuration{}
err = decoder.Decode(&Config)
if err != nil {
log.Fatal("Can't decode config as JSON:", err)
}
return Config
}
func main() {
c := flag.String("config", "config.json", "Specify the configuration file")
flag.Parse()
Config := ReadConfig(*c)
ConnectionString := fmt.Sprintf(
"amqp://%s:%s@%s:5672/%s",
Config.Username,
Config.Password,
Config.Hostname,
Config.Vhost,
)
RETRY_SLEEP_DELAY := 1000 * time.Millisecond
SUCCESS_SLEEP_DELAY := 10000 * time.Millisecond
for {
ConnectionHandler, err := amqp.Dial(ConnectionString)
defer ConnectionHandler.Close()
if err != nil {
fmt.Println("Failed to connect to RabbitMQ:", err)
time.Sleep(RETRY_SLEEP_DELAY)
continue
}
ChannelHandler, err := ConnectionHandler.Channel()
defer ChannelHandler.Close()
if err != nil {
fmt.Println("Failed to open a channel:", err)
time.Sleep(RETRY_SLEEP_DELAY)
continue
}
QueueHandler, err := ChannelHandler.QueueDeclare(
Config.QueueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Failed to declare a queue:", err)
time.Sleep(RETRY_SLEEP_DELAY)
continue
}
stat, err := linuxproc.ReadStat("/proc/stat")
if err != nil {
log.Fatal("stat read fail") // The only way to stop running
}
proc_num_str := strconv.FormatUint(stat.Processes, 10)
err = ChannelHandler.Publish(
"", // exchange
QueueHandler.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(proc_num_str)})
if err != nil {
fmt.Println("Failed to publish a message:", err)
time.Sleep(RETRY_SLEEP_DELAY)
continue
}
fmt.Println("Sent process count:", proc_num_str)
time.Sleep(SUCCESS_SLEEP_DELAY)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment