Skip to content

Instantly share code, notes, and snippets.

@naviat
Forked from jorgemarsal/k8s.py
Created August 1, 2018 16:46
Show Gist options
  • Save naviat/988826061a0b6c4a7505d7d1b550144d to your computer and use it in GitHub Desktop.
Save naviat/988826061a0b6c4a7505d7d1b550144d to your computer and use it in GitHub Desktop.
Helper functions to get started with Kubernetes API
import jinja2
import json
import logging
import os
import requests
import tempfile
import pykube.config
import pykube.http
from ._compat import to_string, string_types
class K8sError(Exception):
pass
class K8sClient(object):
def __init__(self, config_filename=None,
k8s_url=None, k8s_user=None, k8s_ca=None, k8s_token=None):
if config_filename:
self.config = pykube.config.KubeConfig(config_filename)
else:
file_contents = '''apiVersion: v1
clusters:
- cluster:
certificate-authority: {}
server: {}
name: local
contexts:
- context:
cluster: local
user: {}
name: local
current-context: local
kind: Config
preferences:
users:
- name: {}
user:
token: "{}"'''.format(k8s_ca, k8s_url, k8s_user, k8s_user, k8s_token)
with tempfile.NamedTemporaryFile() as f:
f.write(file_contents)
f.flush()
self.config = pykube.config.KubeConfig(f.name)
self.http_client = pykube.http.HTTPClient(self.config)
def get_service_ip(self, namespace, name):
services = self.get_services(namespace=namespace)
ip = None
for s in services['items']:
if s['metadata']['name'] == name:
ip = to_string(s['spec']['clusterIP'])
if not ip:
raise K8sError('Unable to get {} ip'.format(name))
return ip
def get_services(self, namespace='default'):
"""Returns a list of the service names"""
return self._get_components('services', namespace=namespace)
def _get_components(self, component_type, use_raw=False,
namespace='default'):
rsp = self.http_client.get(url='/{}'.format(
component_type), namespace=namespace)
if rsp.status_code != 200:
raise K8sError(to_string(rsp.text))
json_rsp = json.loads(to_string(rsp.text))
return json_rsp
def post_json(self, json_obj):
if isinstance(json_obj, string_types):
try:
json_obj = json.loads(json_obj)
except ValueError as e:
raise K8sError('Bad json config: {}'.format(str(e)))
if 'kind' not in json_obj:
raise K8sError('no kind field in json_obj: {}'.format(json_obj))
component_type = json_obj['kind'].lower()
if 'metadata' not in json_obj:
raise K8sError(
'no metadata field in json_obj: {}'.format(json_obj))
namespace = json_obj['metadata'].get('namespace', 'default')
if component_type == 'namespace':
try:
rsp = self.http_client.post(url='/{}s'.format(component_type),
json=json_obj)
except requests.exceptions.ConnectionError as e:
raise K8sError("Unable to connect to k8s: {}".format(str(e)))
else:
try:
self.create_namespace(namespace=namespace)
except K8sError:
raise
logging.debug('Posting JSON to k8s: {}'.format(json_obj))
try:
rsp = self.http_client.post(url='/{}s'.format(component_type),
json=json_obj,
namespace=namespace)
except requests.exceptions.ConnectionError as e:
raise K8sError("Unable to connect to k8s: {}".format(str(e)))
if rsp.status_code != 201:
if rsp.status_code == 409:
# it already exists
pass
else:
raise K8sError(to_string(rsp.text))
def create_namespace(self, namespace):
template = '''{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": "%s"
}
}''' % (namespace)
try:
rsp = self.http_client.post(url='/namespaces',
json=json.loads(template))
except requests.exceptions.ConnectionError as e:
raise K8sError("Unable to connect to k8s: {}".format(str(e)))
if rsp.status_code != 201:
if rsp.status_code == 409:
# it already exists
pass
else:
raise K8sError(to_string(rsp.text))
@staticmethod
def create_k8s_specs(name, version, ports, docker_tag):
k8s_rc_template = '''
{
"kind": "ReplicationController",
"apiVersion": "v1",
"metadata": {
"name": "{{ name }}-{{ version }}",
"namespace": "default",
"labels":{
"app":"{{ name }}",
"version":"{{ version }}"
}
},
"spec":{
"replicas": 1,
"selector":{
"app":"{{ name }}",
"version":"{{ version }}"
},
"template":{
"metadata":{
"labels":{
"app":"{{ name }}",
"version":"{{ version }}"
}
},
"spec":{
"containers":[
{
"image":"{{ image }}",
"env": [
{{ env }}
],
"ports" : [
{% for port in ports %}{
"containerPort": {{ port }}
}{% if not loop.last %},{% endif %}
{% endfor %}
],
"name": "{{ name }}"
}
]
}
}
}
}'''
rc_spec = jinja2.Environment().from_string(k8s_rc_template).render(
dict(name=name,
version=version,
ports=ports,
image=docker_tag))
k8s_svc_template = '''
{
"kind":"Service",
"apiVersion":"v1",
"metadata":{
"name":"{{ name }}-{{ version }}",
"namespace": "default",
"labels":{
"app":"{{ name }}",
"version":"{{ version }}"
}
},
"spec":{
"ports": [
{% for port in ports %}{
"port": {{ port }},
"targetPort": {{ port }}
}{% if not loop.last %},{% endif %}
{% endfor %}
],
"selector":{
"app":"{{ name }}",
"version":"{{ version }}"
},
"type": "LoadBalancer"
}
}'''
svc_spec = jinja2.Environment().from_string(k8s_svc_template).render(
dict(
name=name,
version=version,
ports=ports
)
)
return rc_spec, svc_spec
class MockK8sClient(K8sClient):
def post_json(self, json_str):
raise K8sError('Error raised in the mock')
def get_service_ip(self, namespace, name):
raise K8sError('Error raised in the mock')
def init_k8s_client(config):
"""Init Kubernetes config"""
fname = '{}/.kube/config'.format(os.environ.get('HOME', ''))
if os.path.exists(fname):
k8s_client = K8sClient(config_filename=fname)
else:
try:
fname = '/var/run/secrets/kubernetes.io/serviceaccount/token'
with open(fname) as f:
token = f.read()
except IOError:
token = None
try:
k8s_client = K8sClient(
k8s_user=config['K8S_USER'],
k8s_url=config['K8S_URL'],
k8s_ca=config['K8S_CA'],
k8s_token=token)
except KeyError as e:
raise K8sError("Bad config: {}".format(str(e)))
if not k8s_client:
raise K8sError("Unable to create k8s_client")
return k8s_client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment