Created
February 23, 2018 15:27
-
-
Save inesusvet/0ef50c8dd02910a24f43862c55a2eac7 to your computer and use it in GitHub Desktop.
Simple amqp producer script which is tolerant to connection errors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
linuxproc "github.com/c9s/goprocinfo/linux" | |
"github.com/streadway/amqp" | |
"log" | |
"os" | |
"strconv" | |
"time" | |
) | |
type Configuration struct { | |
Hostname string | |
Username string | |
Password string | |
QueueName string | |
Vhost string | |
} | |
func ReadConfig(filename string) Configuration { | |
file, err := os.Open(filename) | |
if err != nil { | |
log.Fatal("Can't read config from file:", err) | |
} | |
defer file.Close() | |
decoder := json.NewDecoder(file) | |
Config := Configuration{} | |
err = decoder.Decode(&Config) | |
if err != nil { | |
log.Fatal("Can't decode config as JSON:", err) | |
} | |
return Config | |
} | |
func main() { | |
c := flag.String("config", "config.json", "Specify the configuration file") | |
flag.Parse() | |
Config := ReadConfig(*c) | |
ConnectionString := fmt.Sprintf( | |
"amqp://%s:%s@%s:5672/%s", | |
Config.Username, | |
Config.Password, | |
Config.Hostname, | |
Config.Vhost, | |
) | |
RETRY_SLEEP_DELAY := 1000 * time.Millisecond | |
SUCCESS_SLEEP_DELAY := 10000 * time.Millisecond | |
for { | |
ConnectionHandler, err := amqp.Dial(ConnectionString) | |
defer ConnectionHandler.Close() | |
if err != nil { | |
fmt.Println("Failed to connect to RabbitMQ:", err) | |
time.Sleep(RETRY_SLEEP_DELAY) | |
continue | |
} | |
ChannelHandler, err := ConnectionHandler.Channel() | |
defer ChannelHandler.Close() | |
if err != nil { | |
fmt.Println("Failed to open a channel:", err) | |
time.Sleep(RETRY_SLEEP_DELAY) | |
continue | |
} | |
QueueHandler, err := ChannelHandler.QueueDeclare( | |
Config.QueueName, // name | |
false, // durable | |
false, // delete when unused | |
false, // exclusive | |
false, // no-wait | |
nil, // arguments | |
) | |
if err != nil { | |
fmt.Println("Failed to declare a queue:", err) | |
time.Sleep(RETRY_SLEEP_DELAY) | |
continue | |
} | |
stat, err := linuxproc.ReadStat("/proc/stat") | |
if err != nil { | |
log.Fatal("stat read fail") // The only way to stop running | |
} | |
proc_num_str := strconv.FormatUint(stat.Processes, 10) | |
err = ChannelHandler.Publish( | |
"", // exchange | |
QueueHandler.Name, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: []byte(proc_num_str)}) | |
if err != nil { | |
fmt.Println("Failed to publish a message:", err) | |
time.Sleep(RETRY_SLEEP_DELAY) | |
continue | |
} | |
fmt.Println("Sent process count:", proc_num_str) | |
time.Sleep(SUCCESS_SLEEP_DELAY) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment