Skip to content

Instantly share code, notes, and snippets.

@wusuopu
Last active June 13, 2024 12:28
Show Gist options
  • Save wusuopu/5fe22b309028ca2ad08c3242254f607b to your computer and use it in GitHub Desktop.
Save wusuopu/5fe22b309028ca2ad08c3242254f607b to your computer and use it in GitHub Desktop.
call rancher 1.6 api to upgrade service.
#!/usr/bin/env python2
# encoding: utf-8
import urlparse
import argparse
import base64
import json
import time
import os
import sys
import ssl
import httplib
import collections
class Client(object):
def __init__(self):
args = self.parse_arg()
self.url = urlparse.urlparse(args.url)
self.image = args.image
self.conn = httplib.HTTPSConnection(
self.url.hostname,
self.url.port,
context=ssl._create_unverified_context()
)
token = base64.standard_b64encode('%s:%s' % (args.key, args.secret))
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic %s' % (token)
}
self._info = {}
def info(self):
self.conn.request('GET', self.url.path, None, self.headers)
self._info = self.__getresponse()
return self._info
def finish_upgrade(self):
self.conn.request(
'POST',
'%s?action=finishupgrade' % (self.url.path),
None,
self.headers
)
return self.__getresponse()
def upgrade(self):
if not self._info['upgrade'] or not self._info['upgrade']['inServiceStrategy']:
# upgrade in first time
payload = {
"inServiceStrategy": {
"type": "inServiceUpgradeStrategy",
"batchSize": 1,
"intervalMillis": 2000,
"previousSecondaryLaunchConfigs": [],
"secondaryLaunchConfigs": [],
"startFirst": False,
"launchConfig": self._info['launchConfig'],
},
"toServiceStrategy": None
}
else:
payload = {
"inServiceStrategy": self._info['upgrade']['inServiceStrategy'],
"toServiceStrategy": None
}
payload['inServiceStrategy']['launchConfig']['imageUuid'] = 'docker:%s' % (self.image)
self.conn.request(
'POST',
'%s?action=upgrade' % (self.url.path),
json.dumps(payload),
self.headers
)
return self.__getresponse()
def __getresponse(self):
res = self.conn.getresponse()
if res.status >= 300:
raise Exception('rancher server error: %s' % (res.status))
return json.loads(res.read().decode('utf-8'), object_pairs_hook=collections.OrderedDict)
def parse_arg(self):
parser = argparse.ArgumentParser(description='Upgrade rancher service')
parser.add_argument('--url', required=True, help='rancher service api url')
parser.add_argument('--key', required=True, help='env name for api key')
parser.add_argument('--secret', required=True, help='env name for api secret')
parser.add_argument('--image', required=True, help='new docker image')
args = parser.parse_args()
key = os.environ.get(args.key)
secret = os.environ.get(args.secret)
if not key or not secret:
print('missing api key or api secret')
sys.exit(2)
args.key = key
args.secret = secret
return args
def check_state(client):
max_try = 40
i = 0
info = client.info()
while 'ing' in info['state']:
# this service is busy.
info = client.info()
i += 1
if i >= max_try:
break
time.sleep(5)
return info['state'] == 'active'
def finish_upgrade(client):
info = client.info()
if info['state'] == 'active':
return True
if info['state'] == 'upgraded':
# there is a unfinished upgrade
print('finish a old upgrade')
client.finish_upgrade()
return check_state(client)
def main():
client = Client()
if not finish_upgrade(client):
raise Exception("this service is not active")
print('start to upgrade')
client.upgrade()
if check_state(client) or client._info['state'] == 'upgraded':
print('Current service upgrade finished')
# auto finish upgrade
if os.environ.get('AUTO_FINISH_UPGRADE', '1') == '1' and not finish_upgrade(client):
print('Finish upgrade failed')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment