-
-
Save naviat/988826061a0b6c4a7505d7d1b550144d to your computer and use it in GitHub Desktop.
Helper functions to get started with Kubernetes API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import jinja2 | |
import json | |
import logging | |
import os | |
import requests | |
import tempfile | |
import pykube.config | |
import pykube.http | |
from ._compat import to_string, string_types | |
class K8sError(Exception): | |
pass | |
class K8sClient(object): | |
def __init__(self, config_filename=None, | |
k8s_url=None, k8s_user=None, k8s_ca=None, k8s_token=None): | |
if config_filename: | |
self.config = pykube.config.KubeConfig(config_filename) | |
else: | |
file_contents = '''apiVersion: v1 | |
clusters: | |
- cluster: | |
certificate-authority: {} | |
server: {} | |
name: local | |
contexts: | |
- context: | |
cluster: local | |
user: {} | |
name: local | |
current-context: local | |
kind: Config | |
preferences: | |
users: | |
- name: {} | |
user: | |
token: "{}"'''.format(k8s_ca, k8s_url, k8s_user, k8s_user, k8s_token) | |
with tempfile.NamedTemporaryFile() as f: | |
f.write(file_contents) | |
f.flush() | |
self.config = pykube.config.KubeConfig(f.name) | |
self.http_client = pykube.http.HTTPClient(self.config) | |
def get_service_ip(self, namespace, name): | |
services = self.get_services(namespace=namespace) | |
ip = None | |
for s in services['items']: | |
if s['metadata']['name'] == name: | |
ip = to_string(s['spec']['clusterIP']) | |
if not ip: | |
raise K8sError('Unable to get {} ip'.format(name)) | |
return ip | |
def get_services(self, namespace='default'): | |
"""Returns a list of the service names""" | |
return self._get_components('services', namespace=namespace) | |
def _get_components(self, component_type, use_raw=False, | |
namespace='default'): | |
rsp = self.http_client.get(url='/{}'.format( | |
component_type), namespace=namespace) | |
if rsp.status_code != 200: | |
raise K8sError(to_string(rsp.text)) | |
json_rsp = json.loads(to_string(rsp.text)) | |
return json_rsp | |
def post_json(self, json_obj): | |
if isinstance(json_obj, string_types): | |
try: | |
json_obj = json.loads(json_obj) | |
except ValueError as e: | |
raise K8sError('Bad json config: {}'.format(str(e))) | |
if 'kind' not in json_obj: | |
raise K8sError('no kind field in json_obj: {}'.format(json_obj)) | |
component_type = json_obj['kind'].lower() | |
if 'metadata' not in json_obj: | |
raise K8sError( | |
'no metadata field in json_obj: {}'.format(json_obj)) | |
namespace = json_obj['metadata'].get('namespace', 'default') | |
if component_type == 'namespace': | |
try: | |
rsp = self.http_client.post(url='/{}s'.format(component_type), | |
json=json_obj) | |
except requests.exceptions.ConnectionError as e: | |
raise K8sError("Unable to connect to k8s: {}".format(str(e))) | |
else: | |
try: | |
self.create_namespace(namespace=namespace) | |
except K8sError: | |
raise | |
logging.debug('Posting JSON to k8s: {}'.format(json_obj)) | |
try: | |
rsp = self.http_client.post(url='/{}s'.format(component_type), | |
json=json_obj, | |
namespace=namespace) | |
except requests.exceptions.ConnectionError as e: | |
raise K8sError("Unable to connect to k8s: {}".format(str(e))) | |
if rsp.status_code != 201: | |
if rsp.status_code == 409: | |
# it already exists | |
pass | |
else: | |
raise K8sError(to_string(rsp.text)) | |
def create_namespace(self, namespace): | |
template = '''{ | |
"apiVersion": "v1", | |
"kind": "Namespace", | |
"metadata": { | |
"name": "%s" | |
} | |
}''' % (namespace) | |
try: | |
rsp = self.http_client.post(url='/namespaces', | |
json=json.loads(template)) | |
except requests.exceptions.ConnectionError as e: | |
raise K8sError("Unable to connect to k8s: {}".format(str(e))) | |
if rsp.status_code != 201: | |
if rsp.status_code == 409: | |
# it already exists | |
pass | |
else: | |
raise K8sError(to_string(rsp.text)) | |
@staticmethod | |
def create_k8s_specs(name, version, ports, docker_tag): | |
k8s_rc_template = ''' | |
{ | |
"kind": "ReplicationController", | |
"apiVersion": "v1", | |
"metadata": { | |
"name": "{{ name }}-{{ version }}", | |
"namespace": "default", | |
"labels":{ | |
"app":"{{ name }}", | |
"version":"{{ version }}" | |
} | |
}, | |
"spec":{ | |
"replicas": 1, | |
"selector":{ | |
"app":"{{ name }}", | |
"version":"{{ version }}" | |
}, | |
"template":{ | |
"metadata":{ | |
"labels":{ | |
"app":"{{ name }}", | |
"version":"{{ version }}" | |
} | |
}, | |
"spec":{ | |
"containers":[ | |
{ | |
"image":"{{ image }}", | |
"env": [ | |
{{ env }} | |
], | |
"ports" : [ | |
{% for port in ports %}{ | |
"containerPort": {{ port }} | |
}{% if not loop.last %},{% endif %} | |
{% endfor %} | |
], | |
"name": "{{ name }}" | |
} | |
] | |
} | |
} | |
} | |
}''' | |
rc_spec = jinja2.Environment().from_string(k8s_rc_template).render( | |
dict(name=name, | |
version=version, | |
ports=ports, | |
image=docker_tag)) | |
k8s_svc_template = ''' | |
{ | |
"kind":"Service", | |
"apiVersion":"v1", | |
"metadata":{ | |
"name":"{{ name }}-{{ version }}", | |
"namespace": "default", | |
"labels":{ | |
"app":"{{ name }}", | |
"version":"{{ version }}" | |
} | |
}, | |
"spec":{ | |
"ports": [ | |
{% for port in ports %}{ | |
"port": {{ port }}, | |
"targetPort": {{ port }} | |
}{% if not loop.last %},{% endif %} | |
{% endfor %} | |
], | |
"selector":{ | |
"app":"{{ name }}", | |
"version":"{{ version }}" | |
}, | |
"type": "LoadBalancer" | |
} | |
}''' | |
svc_spec = jinja2.Environment().from_string(k8s_svc_template).render( | |
dict( | |
name=name, | |
version=version, | |
ports=ports | |
) | |
) | |
return rc_spec, svc_spec | |
class MockK8sClient(K8sClient): | |
def post_json(self, json_str): | |
raise K8sError('Error raised in the mock') | |
def get_service_ip(self, namespace, name): | |
raise K8sError('Error raised in the mock') | |
def init_k8s_client(config): | |
"""Init Kubernetes config""" | |
fname = '{}/.kube/config'.format(os.environ.get('HOME', '')) | |
if os.path.exists(fname): | |
k8s_client = K8sClient(config_filename=fname) | |
else: | |
try: | |
fname = '/var/run/secrets/kubernetes.io/serviceaccount/token' | |
with open(fname) as f: | |
token = f.read() | |
except IOError: | |
token = None | |
try: | |
k8s_client = K8sClient( | |
k8s_user=config['K8S_USER'], | |
k8s_url=config['K8S_URL'], | |
k8s_ca=config['K8S_CA'], | |
k8s_token=token) | |
except KeyError as e: | |
raise K8sError("Bad config: {}".format(str(e))) | |
if not k8s_client: | |
raise K8sError("Unable to create k8s_client") | |
return k8s_client |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment