Created
January 7, 2015 16:10
-
-
Save StalkingKillah/d636f82f3d9b995b05ea to your computer and use it in GitHub Desktop.
Basic CloudFlare API Python Library (using requests library)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import requests | |
try: | |
import json | |
except ImportError: | |
import simplejson as json | |
class CloudFlare(object): | |
def __init__(self, email, token): | |
self.url = 'https://www.cloudflare.com/api_json.html' | |
self.email = email | |
self.token = token | |
def _request(self, payload): | |
r = requests.get(self.url, params=payload) | |
if r.status_code == 200: | |
return r.json() | |
else: | |
logging.error('Unable to make request to CloudFlare!') | |
return None | |
def _create_payload(self, action, data=None): | |
payload = { | |
'a': action, | |
'tkn': self.token, | |
'email': self.email | |
} | |
return ((data is not None and type(data) is dict) and dict(payload, **data)) or payload | |
def get_records(self, domain): | |
data = { 'z': domain } | |
payload = self._create_payload('rec_load_all', data) | |
return self._request(payload) | |
def get_record(self, domain, name): | |
records = self.get_records(domain)['response']['recs'] | |
fqdn = "{}.{}".format(name, domain) | |
record = filter(lambda record: record['name'] == fqdn, records['objs']) | |
if len(record) > 0: | |
return record[0] | |
else: | |
logging.warning('Record for %s not found.', fqdn) | |
return None | |
def create_record(self, domain, name, content, record_type='A', ttl=1): | |
record_types = ['A', 'CNAME', 'MX', 'TXT', 'SPF', 'AAAA', 'NS', 'SRV', 'LOC'] | |
if record_type.upper() in record_types: | |
data = { | |
'z': domain, | |
'type': record_type.upper(), | |
'name': name, | |
'content': content, | |
'ttl': ttl | |
} | |
payload = self._create_payload('rec_new', data) | |
return self._request(payload) | |
else: | |
logging.error('Record type is not in %s', '/'.join(record_types)) | |
return None | |
def edit_record(self, domain, name, content, record_type='', ttl=None): | |
record = self.get_record(domain, name) | |
if record is not None: | |
filter_non_empty = lambda dictionary: dict([ (i, dictionary[i]) for i in dictionary if dictionary[i] is not None or dictionary[i] is not '']) | |
data = { | |
'z': domain, | |
'type': record_type, | |
'id': record['rec_id'], | |
'content': content, | |
'ttl': ttl | |
} | |
data = filter_non_empty(data) | |
payload = self._create_payload('rec_edit', data) | |
return self._request(payload) | |
else: | |
return None | |
def delete_record(self, domain, name): | |
record = self.get_record(domain, name) | |
if record is not None: | |
data = { | |
'z': domain, | |
'id': record['rec_id'] | |
} | |
payload = self._create_payload('rec_delete', data) | |
return self._request(payload) | |
else: | |
return None | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment