Created
August 20, 2018 07:48
-
-
Save philpep/2d9cd843af7aaed647657ee7bfcf886b to your computer and use it in GitHub Desktop.
Update k8s deployment images when new versions are pushed (by comparing image sha256 digest)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import collections | |
import json | |
import logging | |
import re | |
import subprocess | |
import sys | |
LOG = logging.getLogger(__name__) | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is True for "yes" or False for "no". | |
""" | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while True: | |
sys.stdout.write(question + prompt) | |
choice = input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
sys.stdout.write("Please respond with 'yes' or 'no' " | |
"(or 'y' or 'n').\n") | |
def check_call(*args, log=True, **kwargs): | |
if log: | |
LOG.info(' '.join(args)) | |
subprocess.check_call(args, **kwargs) | |
def check_output(*args, log=True, **kwargs): | |
if log: | |
LOG.info(' '.join(args)) | |
try: | |
return subprocess.check_output(args, **kwargs) | |
except subprocess.CalledProcessError as exc: | |
print('STDOUT: ', | |
exc.stdout.decode('utf8') if exc.stdout is not None else '') | |
print('STDERR: ', | |
exc.stderr.decode('utf8') if exc.stderr is not None else '') | |
raise | |
class Image(collections.namedtuple('Image', ['name', 'current', 'expected'])): | |
@staticmethod | |
def get_sha256(name, _cache={}): | |
if name not in _cache: | |
# TODO: sha256 digest could be requested without pulling the image | |
check_output('docker', 'pull', name, log=False) | |
_cache[name] = check_output( | |
'docker', 'inspect', '--format={{index .RepoDigests 0}}', | |
name, log=False).decode().strip() | |
return _cache[name] | |
@property | |
def sha256(self): | |
return self.get_sha256(self.expected) | |
def need_update(self): | |
return self.sha256 not in self.current | |
class Deployment(collections.namedtuple('Deployment', ['name', 'images'])): | |
@classmethod | |
def collect(cls): | |
data = json.loads(check_output( | |
'kubectl', 'get', 'pod,rs,deployment', '-o', 'json').decode()) | |
items = {'Pod': {}, 'ReplicaSet': {}, 'Deployment': {}} | |
for item in data['items']: | |
kind = item['kind'] | |
name = item['metadata']['name'] | |
obj = {'name': name} | |
if kind in ('Pod', 'ReplicaSet'): | |
owner, = item['metadata']['ownerReferences'] | |
if owner['kind'] not in ('ReplicaSet', 'Deployment'): | |
# Job | |
continue | |
assert owner['kind'] == ( | |
'ReplicaSet' if kind == 'Pod' else 'Deployment') | |
obj['owner'] = owner['name'] | |
if kind == 'Deployment': | |
config = json.loads(item['metadata']['annotations'][ | |
'kubectl.kubernetes.io/last-applied-configuration']) | |
obj['images'] = [] | |
for c in config['spec']['template']['spec']['containers']: | |
obj['images'].append({ | |
'name': c['name'], | |
'image': c['image'], | |
}) | |
if kind == 'Pod': | |
obj['images'] = [] | |
for c in item['status']['containerStatuses']: | |
if not c['imageID']: | |
# TODO: this occur when pod is terminating (might be a | |
# better way to check this) | |
continue | |
image = re.match( | |
'^docker-pullable://(.*@sha256:.*)$', c['imageID'] | |
).groups()[0] | |
obj['images'].append({ | |
'name': c['name'], | |
'image': image, | |
}) | |
assert name not in items[kind] | |
items[kind][name] = obj | |
def get_pods(name): | |
for pod in items['Pod'].values(): | |
if items['ReplicaSet'][pod['owner']]['owner'] == name: | |
yield pod | |
for _, deployment in sorted(items['Deployment'].items()): | |
images = {c['name']: Image(c['name'], set(), c['image']) | |
for c in deployment['images']} | |
for pod in get_pods(deployment['name']): | |
for c in pod['images']: | |
assert c['name'] in images, c | |
images[c['name']].current.add(c['image']) | |
yield Deployment(deployment['name'], images) | |
def to_update(self): | |
print('checking deployment/{}'.format(self.name)) | |
for _, image in sorted(self.images.items()): | |
if image.need_update(): | |
print( | |
'\033[0;31m {} current: {}, expected: {}\033[0m'.format( | |
image.name, ', '.join(image.current), image.sha256)) | |
yield self.name, image.name, image.sha256 | |
else: | |
print('\033[0;32m {} is up to date\033[0m'.format( | |
image.name)) | |
def main(): | |
to_update = [] | |
for deployment in Deployment.collect(): | |
to_update.extend(list(deployment.to_update())) | |
for name, img_name, sha256 in to_update: | |
cmd = ('kubectl', 'set', 'image', 'deployment/{}'.format(name), | |
'{}={}'.format(img_name, sha256)) | |
if query_yes_no('run {} ? (y/n)'.format(' '.join(cmd))): | |
check_call(*cmd) | |
if __name__ == '__main__': | |
logging.basicConfig(format='%(asctime)-15s %(message)s') | |
LOG.setLevel(logging.INFO) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment