Skip to content

Instantly share code, notes, and snippets.

@blalor
Last active September 27, 2020 17:54
Show Gist options
  • Save blalor/246eaf5755e784b353ab756a36a1142e to your computer and use it in GitHub Desktop.
Save blalor/246eaf5755e784b353ab756a36a1142e to your computer and use it in GitHub Desktop.
systemd ExecStop script for Nomad
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
## in order to cleanly shut down a node with running jobs, the node needs to be
## drained, and then we need to wait for allocations to be migrated away. in
## this script, we:
## * set up a watch for node-update evals for the current node
## * wait for allocations currently running to complete
## * wait for allocations from the watched evals to start running
##
## if we end up with a blocked eval as a result of draining the node, just give
## up; haven't found a way to identify that eval.
import os
import logging
import logging.handlers
rootLogger = logging.getLogger()
syslog = logging.handlers.SysLogHandler(address="/dev/log")
syslog.setFormatter(logging.Formatter(os.path.basename(__file__) + "[%(process)d]: %(name)s - %(message)s"))
rootLogger.addHandler(syslog)
if os.isatty(2):
## log to stderr for diagnostic purposes when running manually
stream = logging.StreamHandler()
stream.setFormatter(logging.Formatter("%(asctime)s [%(levelname)7s] %(name)s - %(message)s"))
rootLogger.addHandler(stream)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # INFO
## less verbosity from Nomad
for l in ["Nomad", "AllocWatcher", "EvalWatcher"]:
logging.getLogger(".".join([__name__, l])).setLevel(logging.INFO)
import sys
import errno
import time
import types
import requests
logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(logging.WARNING)
from urlparse import urljoin
import threading
import collections
import signal
## Nomad API client with a generator for watches
class Nomad(object):
def __init__(self, endpoint="http://localhost:4646/v1/"):
super(Nomad, self).__init__()
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self.endpoint = endpoint
self.session = requests.Session()
def __makeRequest(self, method, path, return_index=False, **kwargs):
req = requests.Request(
method,
urljoin(self.endpoint, path),
**kwargs
)
prepared = self.session.prepare_request(req)
## allow for retries in case of a loss of leader
for retry_count in range(3):
self.logger.debug(
"REQUEST %s %s headers=%r body=%r",
prepared.method,
prepared.url,
prepared.headers,
prepared.body,
)
resp = self.session.send(prepared)
self.logger.debug(
"RESPONSE %s %s -> %d: headers=%r body=%s",
method,
prepared.url,
resp.status_code,
resp.headers,
resp.text,
)
if resp.status_code == 500:
self.logger.warn("got 500 for %s; retrying", prepared.url)
if retry_count > 0:
time.sleep((retry_count + 1) * 5)
else:
break
resp.raise_for_status()
resp_body = None
if resp.text:
resp_body = resp.json()
if return_index is True:
wait_index = resp.headers.get("X-Nomad-Index")
if wait_index is not None:
wait_index = int(wait_index)
return wait_index, resp_body
else:
return resp_body
def get(self, path, **kwargs):
return self.__makeRequest("GET", path, **kwargs)
def put(self, path, **kwargs):
return self.__makeRequest("PUT", path, **kwargs)
def watch(self, path, params={}, return_index=False, cancel_event=None):
"""generator that returns the response only when the index changes"""
## path -- nomad endpoint to watch for changes
## params -- optional dict of params to include in the request
## return_index -- if True, yield values are (index, response)
## cancel_event -- optional threading.Event used to terminate watch loop
## as of 0.5.6, if the cluster has no evals, /v1/evaluations will have
## an index of 0 and the blocking query will return immediately.
## initial wait_index of -1 will allow the first pass through the loop
## to be detected and the (empty) result yielded.
wait_index = -1
params = params.copy()
cancelled = False ## default to looping forever
if cancel_event is not None:
cancelled = cancel_event.is_set()
while not cancelled:
params["index"] = max(wait_index, 0)
last_wait_index = wait_index
wait_index, resp = self.__makeRequest("GET", path, params=params, return_index=True)
if wait_index > last_wait_index:
if return_index is True:
yield wait_index, resp
else:
yield resp
if cancel_event is not None:
cancelled = cancel_event.is_set()
class AllocWatcher(threading.Thread):
"""thread that watches an allocation until it reaches a desired status"""
def __init__(self, nomad, alloc, desired_status):
super(AllocWatcher, self).__init__()
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self.nomad = nomad
self.alloc = alloc
self.desired_status = desired_status
def run(self):
# allocClientStatusPending = "pending"
# allocClientStatusRunning = "running"
allocClientStatusComplete = "complete"
allocClientStatusFailed = "failed"
allocClientStatusLost = "lost"
terminal_statii = (
allocClientStatusComplete,
allocClientStatusFailed,
allocClientStatusLost,
)
alloc_id = self.alloc["ID"]
self.logger.debug("watching alloc %s until %s", alloc_id, self.desired_status)
## watch the allocation until it enters the desired status
last_status = None
for alloc in self.nomad.watch("allocation/%s" % alloc_id):
self.alloc = alloc
status = alloc["ClientStatus"]
if last_status is not None and last_status != status:
self.logger.info("alloc %(ID)s %(Name)s is now %(ClientStatus)s", alloc)
last_status = status
if status == self.desired_status:
break
elif status in terminal_statii:
self.logger.error("alloc %(ID)s %(Name)s is now terminal with status %(ClientStatus)s", alloc)
break
self.terminal_status = last_status
def __str__(self):
alloc = self.alloc.copy()
alloc["ID"] = alloc["ID"].split("-")[0]
alloc["NodeID"] = alloc["NodeID"].split("-")[0]
alloc["_desiredStatus"] = self.desired_status
return "<Alloc %(ID)s (%(Name)s) on %(NodeID)s is %(ClientStatus)s -> %(_desiredStatus)s>" % alloc
class EvalWatcher(threading.Thread):
"""thread that watches an evaluation until it reaches completion, tracking descendents"""
def __init__(self, nomad, eval_id_or_dict):
super(EvalWatcher, self).__init__()
self.logger = logging.getLogger(self.__module__ + "." + self.__class__.__name__)
self.nomad = nomad
if isinstance(eval_id_or_dict, types.DictType):
self.evaluation = eval_id_or_dict
else:
self.evaluation = nomad.get("evaluation/" + eval_id_or_dict)
def run(self):
# evalStatusBlocked = "blocked"
# evalStatusPending = "pending"
evalStatusComplete = "complete"
evalStatusFailed = "failed"
evalStatusCancelled = "canceled"
eval_id = self.evaluation["ID"]
terminal_statii = (evalStatusComplete, evalStatusFailed, evalStatusCancelled)
self.logger.debug("watching eval %s until %s", eval_id, evalStatusComplete)
last_status = None
## watch the evaluation until it achieves the desired status
for evaluation in self.nomad.watch("evaluation/%s" % eval_id):
self.evaluation = evaluation
status = evaluation["Status"]
if last_status is not None and last_status != status:
self.logger.info("eval %(ID)s for %(JobID)s for %(TriggeredBy)s is now %(Status)s", evaluation)
last_status = status
if status == evalStatusComplete:
break
elif status in terminal_statii:
self.logger.error("eval %(ID)s for %(JobID)s for %(TriggeredBy)s is now terminal with status %(Status)s", evaluation)
break
self.terminal_status = last_status
def __str__(self):
evaluation = self.evaluation.copy()
evaluation["ID"] = evaluation["ID"].split("-")[0]
evaluation["on"] = ""
if evaluation.get("NodeID"):
evaluation["on"] = " on %s" % evaluation["NodeID"].split("-")[0]
return "<Eval %(ID)s for %(TriggeredBy)s%(on)s for %(JobID)s: %(Status)s>" % evaluation
class PendingWatches(object):
def __init__(self):
super(PendingWatches, self).__init__()
self.watches = collections.OrderedDict()
self.lock = threading.Lock()
def add(self, _type, _id, watch):
key = (_type, _id)
added = False
with self.lock:
if key not in self.watches:
self.watches[key] = watch
watch.start()
added = True
return added
def delete(self, _type, _id):
key = (_type, _id)
with self.lock:
if key in self.watches:
del self.watches[key]
def items(self):
with self.lock:
return self.watches.items()
def pid_exists(pid):
"""Check whether pid exists in the current process table."""
## http://stackoverflow.com/a/6940314/53051
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH)
raise
else:
return True
def main(mainpid=None):
nomad = Nomad()
## get own node ID
logger.debug("retrieving agent info")
agent_info = nomad.get("agent/self")
node_id = agent_info["stats"]["client"]["node_id"]
logger.info("our node id: %s", node_id)
if nomad.get("node/%s" % node_id)["Drain"]:
logger.warn("drain already enabled")
return
## when these watches have exited, the agent can safely stop
pending_watches = PendingWatches()
## currently-running allocations on this node, before enabling drain
for alloc in nomad.get("node/%s/allocations" % node_id):
if alloc["ClientStatus"] != "complete":
pending_watches.add("alloc", alloc["ID"], AllocWatcher(nomad, alloc, "complete"))
## track evals created by draining ourselves
initial_evals_retrieved = threading.Event()
## provides a way to stop the eval_watcher loop
stop_eval_watcher = threading.Event()
def eval_watcher():
seen_evals = set()
last_index = None
for wait_index, evals in nomad.watch(
"evaluations",
params={
"wait": "15s",
},
return_index=True,
cancel_event=stop_eval_watcher,
):
if last_index is None:
## first pass through
seen_evals.update([ e["ID"] for e in evals ])
initial_evals_retrieved.set()
else:
for evaluation in evals:
if evaluation["ID"] not in seen_evals and \
evaluation["Type"] != "system" and \
evaluation["TriggeredBy"] == "node-update" and \
evaluation["NodeID"] == node_id:
seen_evals.add(evaluation["ID"])
## found eval triggered by node update, PRESUMABLY because the
## node is draining.
##
## need to watch for allocs created for these evals to begin
## running.
if not pending_watches.add("eval", evaluation["ID"], EvalWatcher(nomad, evaluation)):
logger.info("already watching eval %(ID)s", evaluation)
last_index = wait_index
## start watching evals; this thread will live until the program exits, so
## make it a daemon
eval_watcher_thread = threading.Thread(target=eval_watcher)
eval_watcher_thread.setDaemon(True)
eval_watcher_thread.start()
logger.debug("waiting for initial evals to be retrieved")
initial_evals_retrieved.wait()
## enable node drain
logger.info("draining ourselves")
nomad.put("node/%s/drain" % node_id, params={"enable": True})
## wait for pending watches to complete
while True:
completed = []
items = pending_watches.items()
if not items:
break
logger.info("waiting for %d watches to complete", len(items))
for key, watch in items:
watch_type, watch_id = key
logger.debug("waiting for %s to complete", watch)
watch.join(timeout=10)
if watch.isAlive():
logger.info("%s is still in progress", watch)
continue
logger.info("%s is complete", watch)
completed.append(key)
if watch_type == "eval":
evaluation = watch.evaluation
if watch.terminal_status != "complete":
logger.warn("terminal status for %r is '%s', not 'complete'", key, watch.terminal_status)
else:
new_allocs = nomad.get("evaluation/%s/allocations" % watch_id)
logger.info(
"eval %s for %s triggered by %s completed with %d new allocations",
evaluation["ID"],
evaluation["JobID"],
evaluation["TriggeredBy"],
len(new_allocs),
)
if evaluation["NextEval"]:
ev = evaluation["NextEval"]
logger.warn("found NextEval %s on eval %s!", ev, watch_id)
if not pending_watches.add("eval", ev, EvalWatcher(nomad, ev)):
logger.info("already watching eval %s", ev)
if evaluation["BlockedEval"]:
ev = evaluation["BlockedEval"]
logger.warn("found BlockedEval %s on eval %s!", ev, watch_id)
## not watching this eval. testing with nomad 0.5.5
## shows that the blocked eval becomes "canceled" when a
## new node with available resources becomes available,
## and that our watches do not catch the eval that
## eventually places the replacement allocation onto the
## elegible node. So the result is that we wait for the
## blocked eval to resolve (become canceled) and then
## immediately finish shutting down, without being able
## to see the new allocation start running. Since the
## goal is to allow the alloc dir to get migrated to the
## new node, there's no point in waiting just to exit
## later.
# if not pending_watches.add("eval", ev, EvalWatcher(nomad, ev)):
# logger.info("already watching eval %s", ev)
for alloc in new_allocs:
if not pending_watches.add("alloc", alloc["ID"], AllocWatcher(nomad, alloc, "running")):
logger.warn("already watching alloc %(ID)s %(Name)s", alloc)
for key in completed:
pending_watches.delete(*key)
logger.info("all pending watches are complete")
stop_eval_watcher.set()
eval_watcher_thread.join()
if mainpid is not None:
mainpid = int(mainpid)
logger.info("killing %d", mainpid)
os.kill(mainpid, signal.SIGTERM)
## wait for the process to exit
while pid_exists(mainpid):
logger.info("waiting for %d to exit", mainpid)
time.sleep(10)
if __name__ == "__main__":
try:
main(*sys.argv[1:])
except:
logger.fatal("failed to cleanly drain ourselves", exc_info=True)
sys.exit(1)
# -*- ini -*-
[Unit]
Description=Nomad job scheduler
Documentation=https://nomadproject.io/docs/
## No Requires or Wants; Nomad will be started by consul-template when all
## config has been generated.
## most config is generated by cloud-init
After=cloud-final.service
## we're actually started by consul-template
After=consul-template.service
## stop before consul stops, so services can be cleanly unregistered
After=consul.service
## docker.service because fingerprinting depends on it.
After=docker.service
## stop before the network goes offline
After=network-online.target
## so we're stopped before rkt containers (which are registered with systemd)
## are stopped
After=machine.slice
## abort startup until credentials are written, via consul-template
AssertPathExists=/etc/nomad.d/01-creds.json
[Service]
ExecStartPre=/usr/local/bin/nomad-configure-vault-integration.py
ExecStart=/usr/local/bin/nomad agent -config /etc/nomad.d
## unlikely we'll need this, but whatev.
ExecReload=/bin/kill -HUP $MAINPID
## stopping
# drain ourselves; when complete, SIGTERM will be sent
ExecStop=/usr/local/bin/nomad-drain-and-wait.py $MAINPID
Restart=always
## TimeoutStopSec applies to ExecStop, so give plenty of time to shut down.
TimeoutStopSec=90m
## only kill the main process, not all processes in the group.
KillMode=process
# discard stdout as we're using syslog
StandardOutput=null
# don't allow the oom-killer to touch this process
OOMScoreAdjust=-1000
# 65k limit on number of open files
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment