Created
October 28, 2019 17:21
-
-
Save vadv/e11c35873bee2378df7778d299be65e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/go-redis/redis" | |
) | |
type celeryHeartBeat struct { | |
Headers struct { | |
Hostname string `json:"hostname"` | |
} `json:"headers"` | |
Properties struct { | |
DeliveryInfo struct { | |
RoutingKey string `json:"routing_key"` | |
} `json:"delivery_info"` | |
} `json:"properties"` | |
} | |
var ( | |
// --broker=redis://{{ $worker.liveness.redis.host }}/{{ $worker.liveness.redis.db }} | |
redisUrl = flag.String(`redis-url`, ``, `Redis connection url, example: redis://prod-workerredis.oebuve.0001.apse1.cache.amazonaws.com/0`) | |
celeryVersion = flag.Uint(`celery-version`, 3, `Celery version, example: 3 or 4`) | |
workerName = flag.String(`worker-name`, ``, `Worker name, example: webapp_user_notification`) | |
) | |
func getCeleryHeartBeatChannel(options *redis.Options) string { | |
if *celeryVersion == 4 { | |
return fmt.Sprintf(`/%d/.celeryev/worker.heartbeat`, options.DB) | |
} | |
return `celeryev` | |
} | |
func getWorkerHost() string { | |
hostname, err := os.Hostname() | |
if err != nil { | |
log.Printf("[FATAL] get hostname: %s\n", err.Error()) | |
} | |
return fmt.Sprintf("%s@%s", *workerName, hostname) | |
} | |
func main() { | |
if !flag.Parsed() { | |
flag.Parse() | |
} | |
if !(*celeryVersion == 3 || *celeryVersion == 4) { | |
log.Printf("[FATAL] unsupported celery version: %d\n", *celeryVersion) | |
os.Exit(1) | |
} | |
options, err := redis.ParseURL(*redisUrl) | |
if err != nil { | |
log.Printf("[FATAL] parse broker url: %s\n", err.Error()) | |
os.Exit(2) | |
} | |
// connect to redis | |
client := redis.NewClient(options) | |
channelName := getCeleryHeartBeatChannel(options) | |
subChan := client.Subscribe(channelName).Channel() | |
sig := make(chan os.Signal, 1) | |
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) | |
// heartBeatChecker | |
ticker := time.NewTicker(30 * time.Second) | |
// get worker name | |
workerHost := getWorkerHost() | |
for { | |
select { | |
case <-ticker.C: | |
log.Printf("[INFO] tick\n") | |
case <-sig: | |
log.Printf("[INFO] shutdown signal\n") | |
client.Close() | |
os.Exit(0) | |
case data := <-subChan: | |
msg := &celeryHeartBeat{} | |
if err := json.Unmarshal([]byte(data.Payload), msg); err != nil { | |
continue | |
} | |
if workerHost == msg.Headers.Hostname && msg.Properties.DeliveryInfo.RoutingKey == `worker.heartbeat` { | |
log.Printf("heartbeat") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment