Skip to content

Instantly share code, notes, and snippets.

@rahulwa
Created June 5, 2018 17:51
Show Gist options
  • Save rahulwa/505fb72dfdf750918c914e0db0830e06 to your computer and use it in GitHub Desktop.
Save rahulwa/505fb72dfdf750918c914e0db0830e06 to your computer and use it in GitHub Desktop.
Wrapper over Consul in Python for configuration management.
import requests
import os
import logging
import sys
import json
import base64
class Conf:
'''
Wrapper on top of consul api to facilitates configuration for Application.
Get Consul Server hostname and port from the environment variable `CONSUL_HOSTNAME` and `CONSUL_PORT`
Puts all application configuration on Consul. Allow overwriting of any configuration with the environment variable.
Priority of configuration will be:
1: Environment Variable
2: Consul key’s value
Key Naming:
- Use underscores to separate words inside the key name.
- Use lower case letters.
- `app_` is must be prefix on key name for environment variable.
So if key name is `redis_hostname` on the application then corresponding
consul key name will be `component/environment/URL/redis_hostname` (like `v2app/production/backend1/redis_hostname`)
Environment variable name will be `app_redis_hostname`.
'''
def __init__(self):
self.consul_hostname = os.environ['CONSUL_HOSTNAME']
self.consul_port = os.environ['CONSUL_PORT']
self.consul_url = "http://{}:{}".format(self.consul_hostname, int(self.consul_port))
self.consul_kv_endpoint = "{}/v1/kv".format(self.consul_url)
self.consul_svc_endpoint = "{}/v1/health/service".format(self.consul_url)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.logger.info("consul url is '{}'".format(self.consul_url))
def __get_consul_kv(self, key_endpoint, params=None):
data = {}
r = requests.get(key_endpoint, params=params)
if not r.ok:
self.logger.error("unable to get key endpoint: '{}' having status_code: '{}'".format(
key_endpoint, r.status_code))
return None
body = json.loads(r.text)
for val in body:
if val.get("Value"):
data[val["Key"]] = base64.b64decode(val["Value"]).decode()
return data
def __del_consul_kv(self, key_endpoint, params=None):
r = requests.delete(key_endpoint, params=params)
if r.text.strip() != "true":
self.logger.error("unable to delete key endpoint: '{}' having status_code: '{}'".format(
key_endpoint, r.status_code))
self.logger.info("successfully deleted key endpoint: '{}' with params: '{}'".format(
key_endpoint, params))
def __put_consul_kv(self, prefix, key, value):
if type(prefix) != str or type(key) != str or type(value) != str:
self.logger.error("send string type only in {}".format(__class__))
key_endpoint = "{}/{}/{}".format(self.consul_kv_endpoint, prefix, key)
r = requests.put(key_endpoint, value)
if r.text.strip() != "true":
self.logger.error("unable to put key endpoint: '{}' having status_code: '{}'".format(
key_endpoint, r.status_code))
self.logger.info("successfully put key endpoint: '{}'".format(key_endpoint))
def __get_consul_svc(self, prefix, service):
'''
Since consul does not have concept of prefix for service, so we are using tags for this purpose.
'''
data = {}
svc_endpoint = "{}/{}".format(self.consul_svc_endpoint, service)
consul_data = requests.get(svc_endpoint, params={
"tag": prefix, "passing": True})
body = json.loads(consul_data.text)
for val in body:
data["address"] = val["Service"]["Address"]
data["port"] = val["Service"]["Port"]
return data
def put(self, prefix, key, value):
'''
It will write to Consul with prefix/key key name.
`prefix`, `key`, `value` are string data-type.
example:
Conf.put("app/staging/main", "key", "value")
'''
self.__put_consul_kv(prefix, key, value)
def reset(self, prefix, data):
'''
It will delete existing prefix namespace on consul, if already present and will create mentioned key-value pairs on Consul.
`data` is dictionary data-type with string data-type as key and value.
`prefix` is string data-type.
example:
d = { "key1": "value1", "key2": "value2" }
Conf.reset("app/staging/main", d)
'''
key_endpoint = "{}/{}".format(self.consul_kv_endpoint, prefix)
self.__del_consul_kv(key_endpoint, params={"recurse": True})
for k, v in data.items():
self.put(prefix, k, v)
def get(self, prefix, key):
'''
It will read from Consul and environment variable then return the value of higher priority.
`prefix`, `key` are string data-type. return data-type is string.
example:
Conf.get("app/staging/main", "key")
'''
env_key = "app_{}".format(key)
if env_key in os.environ:
return os.environ[env_key]
key_endpoint = "{}/{}/{}".format(self.consul_kv_endpoint, prefix, key)
consul_data = self.__get_consul_kv(key_endpoint)
if consul_data:
return consul_data["{}/{}".format(prefix, key)]
else:
return {
"Invalid prefix -> {} or Invalid key -> {}".format(prefix, key)
}
def get_all(self, prefix):
'''
It will read from Consul and environment variable then return the dictionary containing all with below format:
{
environment: {'key': 'value', ... },
consul: {'key': 'value', ... }
}
`prefix` is string data-type and return data-type is dictionary.
example:
Conf.get_all("app/staging/main")
'''
data = {'environment': {}, 'consul': {}}
for k, v in os.environ.items():
if k.startswith("app_"):
data['environment'][k] = v
key_endpoint = "{}/{}".format(self.consul_kv_endpoint, prefix)
consul_data = self.__get_consul_kv(key_endpoint, params={"recurse": True})
if consul_data:
for k, v in consul_data.items():
data['consul'][k] = v
return data
def delete(self, prefix, key):
'''
It will delete `key` from Consul.
`prefix`, `key` are string data-type.
example:
Conf.delete("app/staging/main", "key")
'''
key_endpoint = "{}/{}/{}".format(self.consul_kv_endpoint, prefix, key)
self.__del_consul_kv(key_endpoint)
def delete_all(self, prefix):
'''
It will delete `prefix` from Consul.
`prefix` is string data-type.
example:
Conf.delete_all("app/staging/main")
'''
key_endpoint = "{}/{}".format(self.consul_kv_endpoint, prefix)
self.__del_consul_kv(key_endpoint, params={"recurse": True})
def get_service(self, prefix, service):
'''
It will get service from Consul using HTTP API request.
This will be used for geting services like RabbitMQ, Redis, Kafka, ElasticSearch hostname.
So basically anything that needs status-checking/load-balancing, can be used through this.
`prefix` and `service` are string data-type. return type will be dictionary data-type.
example:
Conf.get_service("app/staging/main", "rabbitmq")
'''
return self.__get_consul_svc(prefix, service)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment