Last active
September 27, 2020 17:54
-
-
Save blalor/246eaf5755e784b353ab756a36a1142e to your computer and use it in GitHub Desktop.
systemd ExecStop script for Nomad
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- encoding: utf-8 -*- | |
## in order to cleanly shut down a node with running jobs, the node needs to be | |
## drained, and then we need to wait for allocations to be migrated away. in | |
## this script, we: | |
## * set up a watch for node-update evals for the current node | |
## * wait for allocations currently running to complete | |
## * wait for allocations from the watched evals to start running | |
## | |
## if we end up with a blocked eval as a result of draining the node, just give | |
## up; haven't found a way to identify that eval. | |
import os | |
import logging | |
import logging.handlers | |
rootLogger = logging.getLogger() | |
syslog = logging.handlers.SysLogHandler(address="/dev/log") | |
syslog.setFormatter(logging.Formatter(os.path.basename(__file__) + "[%(process)d]: %(name)s - %(message)s")) | |
rootLogger.addHandler(syslog) | |
if os.isatty(2): | |
## log to stderr for diagnostic purposes when running manually | |
stream = logging.StreamHandler() | |
stream.setFormatter(logging.Formatter("%(asctime)s [%(levelname)7s] %(name)s - %(message)s")) | |
rootLogger.addHandler(stream) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) # INFO | |
## less verbosity from Nomad | |
for l in ["Nomad", "AllocWatcher", "EvalWatcher"]: | |
logging.getLogger(".".join([__name__, l])).setLevel(logging.INFO) | |
import sys | |
import errno | |
import time | |
import types | |
import requests | |
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.WARNING) | |
from urlparse import urljoin | |
import threading | |
import collections | |
import signal | |
## Nomad API client with a generator for watches | |
class Nomad(object): | |
def __init__(self, endpoint="http://localhost:4646/v1/"): | |
super(Nomad, self).__init__() | |
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) | |
self.endpoint = endpoint | |
self.session = requests.Session() | |
def __makeRequest(self, method, path, return_index=False, **kwargs): | |
req = requests.Request( | |
method, | |
urljoin(self.endpoint, path), | |
**kwargs | |
) | |
prepared = self.session.prepare_request(req) | |
## allow for retries in case of a loss of leader | |
for retry_count in range(3): | |
self.logger.debug( | |
"REQUEST %s %s headers=%r body=%r", | |
prepared.method, | |
prepared.url, | |
prepared.headers, | |
prepared.body, | |
) | |
resp = self.session.send(prepared) | |
self.logger.debug( | |
"RESPONSE %s %s -> %d: headers=%r body=%s", | |
method, | |
prepared.url, | |
resp.status_code, | |
resp.headers, | |
resp.text, | |
) | |
if resp.status_code == 500: | |
self.logger.warn("got 500 for %s; retrying", prepared.url) | |
if retry_count > 0: | |
time.sleep((retry_count + 1) * 5) | |
else: | |
break | |
resp.raise_for_status() | |
resp_body = None | |
if resp.text: | |
resp_body = resp.json() | |
if return_index is True: | |
wait_index = resp.headers.get("X-Nomad-Index") | |
if wait_index is not None: | |
wait_index = int(wait_index) | |
return wait_index, resp_body | |
else: | |
return resp_body | |
def get(self, path, **kwargs): | |
return self.__makeRequest("GET", path, **kwargs) | |
def put(self, path, **kwargs): | |
return self.__makeRequest("PUT", path, **kwargs) | |
def watch(self, path, params={}, return_index=False, cancel_event=None): | |
"""generator that returns the response only when the index changes""" | |
## path -- nomad endpoint to watch for changes | |
## params -- optional dict of params to include in the request | |
## return_index -- if True, yield values are (index, response) | |
## cancel_event -- optional threading.Event used to terminate watch loop | |
## as of 0.5.6, if the cluster has no evals, /v1/evaluations will have | |
## an index of 0 and the blocking query will return immediately. | |
## initial wait_index of -1 will allow the first pass through the loop | |
## to be detected and the (empty) result yielded. | |
wait_index = -1 | |
params = params.copy() | |
cancelled = False ## default to looping forever | |
if cancel_event is not None: | |
cancelled = cancel_event.is_set() | |
while not cancelled: | |
params["index"] = max(wait_index, 0) | |
last_wait_index = wait_index | |
wait_index, resp = self.__makeRequest("GET", path, params=params, return_index=True) | |
if wait_index > last_wait_index: | |
if return_index is True: | |
yield wait_index, resp | |
else: | |
yield resp | |
if cancel_event is not None: | |
cancelled = cancel_event.is_set() | |
class AllocWatcher(threading.Thread): | |
"""thread that watches an allocation until it reaches a desired status""" | |
def __init__(self, nomad, alloc, desired_status): | |
super(AllocWatcher, self).__init__() | |
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) | |
self.nomad = nomad | |
self.alloc = alloc | |
self.desired_status = desired_status | |
def run(self): | |
# allocClientStatusPending = "pending" | |
# allocClientStatusRunning = "running" | |
allocClientStatusComplete = "complete" | |
allocClientStatusFailed = "failed" | |
allocClientStatusLost = "lost" | |
terminal_statii = ( | |
allocClientStatusComplete, | |
allocClientStatusFailed, | |
allocClientStatusLost, | |
) | |
alloc_id = self.alloc["ID"] | |
self.logger.debug("watching alloc %s until %s", alloc_id, self.desired_status) | |
## watch the allocation until it enters the desired status | |
last_status = None | |
for alloc in self.nomad.watch("allocation/%s" % alloc_id): | |
self.alloc = alloc | |
status = alloc["ClientStatus"] | |
if last_status is not None and last_status != status: | |
self.logger.info("alloc %(ID)s %(Name)s is now %(ClientStatus)s", alloc) | |
last_status = status | |
if status == self.desired_status: | |
break | |
elif status in terminal_statii: | |
self.logger.error("alloc %(ID)s %(Name)s is now terminal with status %(ClientStatus)s", alloc) | |
break | |
self.terminal_status = last_status | |
def __str__(self): | |
alloc = self.alloc.copy() | |
alloc["ID"] = alloc["ID"].split("-")[0] | |
alloc["NodeID"] = alloc["NodeID"].split("-")[0] | |
alloc["_desiredStatus"] = self.desired_status | |
return "<Alloc %(ID)s (%(Name)s) on %(NodeID)s is %(ClientStatus)s -> %(_desiredStatus)s>" % alloc | |
class EvalWatcher(threading.Thread): | |
"""thread that watches an evaluation until it reaches completion, tracking descendents""" | |
def __init__(self, nomad, eval_id_or_dict): | |
super(EvalWatcher, self).__init__() | |
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__) | |
self.nomad = nomad | |
if isinstance(eval_id_or_dict, types.DictType): | |
self.evaluation = eval_id_or_dict | |
else: | |
self.evaluation = nomad.get("evaluation/" + eval_id_or_dict) | |
def run(self): | |
# evalStatusBlocked = "blocked" | |
# evalStatusPending = "pending" | |
evalStatusComplete = "complete" | |
evalStatusFailed = "failed" | |
evalStatusCancelled = "canceled" | |
eval_id = self.evaluation["ID"] | |
terminal_statii = (evalStatusComplete, evalStatusFailed, evalStatusCancelled) | |
self.logger.debug("watching eval %s until %s", eval_id, evalStatusComplete) | |
last_status = None | |
## watch the evaluation until it achieves the desired status | |
for evaluation in self.nomad.watch("evaluation/%s" % eval_id): | |
self.evaluation = evaluation | |
status = evaluation["Status"] | |
if last_status is not None and last_status != status: | |
self.logger.info("eval %(ID)s for %(JobID)s for %(TriggeredBy)s is now %(Status)s", evaluation) | |
last_status = status | |
if status == evalStatusComplete: | |
break | |
elif status in terminal_statii: | |
self.logger.error("eval %(ID)s for %(JobID)s for %(TriggeredBy)s is now terminal with status %(Status)s", evaluation) | |
break | |
self.terminal_status = last_status | |
def __str__(self): | |
evaluation = self.evaluation.copy() | |
evaluation["ID"] = evaluation["ID"].split("-")[0] | |
evaluation["on"] = "" | |
if evaluation.get("NodeID"): | |
evaluation["on"] = " on %s" % evaluation["NodeID"].split("-")[0] | |
return "<Eval %(ID)s for %(TriggeredBy)s%(on)s for %(JobID)s: %(Status)s>" % evaluation | |
class PendingWatches(object): | |
def __init__(self): | |
super(PendingWatches, self).__init__() | |
self.watches = collections.OrderedDict() | |
self.lock = threading.Lock() | |
def add(self, _type, _id, watch): | |
key = (_type, _id) | |
added = False | |
with self.lock: | |
if key not in self.watches: | |
self.watches[key] = watch | |
watch.start() | |
added = True | |
return added | |
def delete(self, _type, _id): | |
key = (_type, _id) | |
with self.lock: | |
if key in self.watches: | |
del self.watches[key] | |
def items(self): | |
with self.lock: | |
return self.watches.items() | |
def pid_exists(pid): | |
"""Check whether pid exists in the current process table.""" | |
## http://stackoverflow.com/a/6940314/53051 | |
try: | |
os.kill(pid, 0) | |
except OSError as err: | |
if err.errno == errno.ESRCH: | |
# ESRCH == No such process | |
return False | |
elif err.errno == errno.EPERM: | |
# EPERM clearly means there's a process to deny access to | |
return True | |
else: | |
# According to "man 2 kill" possible error values are | |
# (EINVAL, EPERM, ESRCH) | |
raise | |
else: | |
return True | |
def main(mainpid=None): | |
nomad = Nomad() | |
## get own node ID | |
logger.debug("retrieving agent info") | |
agent_info = nomad.get("agent/self") | |
node_id = agent_info["stats"]["client"]["node_id"] | |
logger.info("our node id: %s", node_id) | |
if nomad.get("node/%s" % node_id)["Drain"]: | |
logger.warn("drain already enabled") | |
return | |
## when these watches have exited, the agent can safely stop | |
pending_watches = PendingWatches() | |
## currently-running allocations on this node, before enabling drain | |
for alloc in nomad.get("node/%s/allocations" % node_id): | |
if alloc["ClientStatus"] != "complete": | |
pending_watches.add("alloc", alloc["ID"], AllocWatcher(nomad, alloc, "complete")) | |
## track evals created by draining ourselves | |
initial_evals_retrieved = threading.Event() | |
## provides a way to stop the eval_watcher loop | |
stop_eval_watcher = threading.Event() | |
def eval_watcher(): | |
seen_evals = set() | |
last_index = None | |
for wait_index, evals in nomad.watch( | |
"evaluations", | |
params={ | |
"wait": "15s", | |
}, | |
return_index=True, | |
cancel_event=stop_eval_watcher, | |
): | |
if last_index is None: | |
## first pass through | |
seen_evals.update([ e["ID"] for e in evals ]) | |
initial_evals_retrieved.set() | |
else: | |
for evaluation in evals: | |
if evaluation["ID"] not in seen_evals and \ | |
evaluation["Type"] != "system" and \ | |
evaluation["TriggeredBy"] == "node-update" and \ | |
evaluation["NodeID"] == node_id: | |
seen_evals.add(evaluation["ID"]) | |
## found eval triggered by node update, PRESUMABLY because the | |
## node is draining. | |
## | |
## need to watch for allocs created for these evals to begin | |
## running. | |
if not pending_watches.add("eval", evaluation["ID"], EvalWatcher(nomad, evaluation)): | |
logger.info("already watching eval %(ID)s", evaluation) | |
last_index = wait_index | |
## start watching evals; this thread will live until the program exits, so | |
## make it a daemon | |
eval_watcher_thread = threading.Thread(target=eval_watcher) | |
eval_watcher_thread.setDaemon(True) | |
eval_watcher_thread.start() | |
logger.debug("waiting for initial evals to be retrieved") | |
initial_evals_retrieved.wait() | |
## enable node drain | |
logger.info("draining ourselves") | |
nomad.put("node/%s/drain" % node_id, params={"enable": True}) | |
## wait for pending watches to complete | |
while True: | |
completed = [] | |
items = pending_watches.items() | |
if not items: | |
break | |
logger.info("waiting for %d watches to complete", len(items)) | |
for key, watch in items: | |
watch_type, watch_id = key | |
logger.debug("waiting for %s to complete", watch) | |
watch.join(timeout=10) | |
if watch.isAlive(): | |
logger.info("%s is still in progress", watch) | |
continue | |
logger.info("%s is complete", watch) | |
completed.append(key) | |
if watch_type == "eval": | |
evaluation = watch.evaluation | |
if watch.terminal_status != "complete": | |
logger.warn("terminal status for %r is '%s', not 'complete'", key, watch.terminal_status) | |
else: | |
new_allocs = nomad.get("evaluation/%s/allocations" % watch_id) | |
logger.info( | |
"eval %s for %s triggered by %s completed with %d new allocations", | |
evaluation["ID"], | |
evaluation["JobID"], | |
evaluation["TriggeredBy"], | |
len(new_allocs), | |
) | |
if evaluation["NextEval"]: | |
ev = evaluation["NextEval"] | |
logger.warn("found NextEval %s on eval %s!", ev, watch_id) | |
if not pending_watches.add("eval", ev, EvalWatcher(nomad, ev)): | |
logger.info("already watching eval %s", ev) | |
if evaluation["BlockedEval"]: | |
ev = evaluation["BlockedEval"] | |
logger.warn("found BlockedEval %s on eval %s!", ev, watch_id) | |
## not watching this eval. testing with nomad 0.5.5 | |
## shows that the blocked eval becomes "canceled" when a | |
## new node with available resources becomes available, | |
## and that our watches do not catch the eval that | |
## eventually places the replacement allocation onto the | |
## elegible node. So the result is that we wait for the | |
## blocked eval to resolve (become canceled) and then | |
## immediately finish shutting down, without being able | |
## to see the new allocation start running. Since the | |
## goal is to allow the alloc dir to get migrated to the | |
## new node, there's no point in waiting just to exit | |
## later. | |
# if not pending_watches.add("eval", ev, EvalWatcher(nomad, ev)): | |
# logger.info("already watching eval %s", ev) | |
for alloc in new_allocs: | |
if not pending_watches.add("alloc", alloc["ID"], AllocWatcher(nomad, alloc, "running")): | |
logger.warn("already watching alloc %(ID)s %(Name)s", alloc) | |
for key in completed: | |
pending_watches.delete(*key) | |
logger.info("all pending watches are complete") | |
stop_eval_watcher.set() | |
eval_watcher_thread.join() | |
if mainpid is not None: | |
mainpid = int(mainpid) | |
logger.info("killing %d", mainpid) | |
os.kill(mainpid, signal.SIGTERM) | |
## wait for the process to exit | |
while pid_exists(mainpid): | |
logger.info("waiting for %d to exit", mainpid) | |
time.sleep(10) | |
if __name__ == "__main__": | |
try: | |
main(*sys.argv[1:]) | |
except: | |
logger.fatal("failed to cleanly drain ourselves", exc_info=True) | |
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- ini -*- | |
[Unit] | |
Description=Nomad job scheduler | |
Documentation=https://nomadproject.io/docs/ | |
## No Requires or Wants; Nomad will be started by consul-template when all | |
## config has been generated. | |
## most config is generated by cloud-init | |
After=cloud-final.service | |
## we're actually started by consul-template | |
After=consul-template.service | |
## stop before consul stops, so services can be cleanly unregistered | |
After=consul.service | |
## docker.service because fingerprinting depends on it. | |
After=docker.service | |
## stop before the network goes offline | |
After=network-online.target | |
## so we're stopped before rkt containers (which are registered with systemd) | |
## are stopped | |
After=machine.slice | |
## abort startup until credentials are written, via consul-template | |
AssertPathExists=/etc/nomad.d/01-creds.json | |
[Service] | |
ExecStartPre=/usr/local/bin/nomad-configure-vault-integration.py | |
ExecStart=/usr/local/bin/nomad agent -config /etc/nomad.d | |
## unlikely we'll need this, but whatev. | |
ExecReload=/bin/kill -HUP $MAINPID | |
## stopping | |
# drain ourselves; when complete, SIGTERM will be sent | |
ExecStop=/usr/local/bin/nomad-drain-and-wait.py $MAINPID | |
Restart=always | |
## TimeoutStopSec applies to ExecStop, so give plenty of time to shut down. | |
TimeoutStopSec=90m | |
## only kill the main process, not all processes in the group. | |
KillMode=process | |
# discard stdout as we're using syslog | |
StandardOutput=null | |
# don't allow the oom-killer to touch this process | |
OOMScoreAdjust=-1000 | |
# 65k limit on number of open files | |
LimitNOFILE=65536 | |
[Install] | |
WantedBy=multi-user.target | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment