Skip to content

Instantly share code, notes, and snippets.

@geeksunny
Created September 18, 2016 23:19
Show Gist options
  • Save geeksunny/e9f119fc6b947f9aa724f03defe97d78 to your computer and use it in GitHub Desktop.
Save geeksunny/e9f119fc6b947f9aa724f03defe97d78 to your computer and use it in GitHub Desktop.
{
"api": {
"api_key": "abcdefg1234567",
"user_id": "12345"
},
"domains": {
"example.com": {
"A": [
{
"name": "",
"ttl": 86400,
"priority": null
},
{
"name": "www"
}
],
"CNAME": [
{
"name": "sample"
}
]
},
"sample.net": [
"A": [
{
"name": "",
},
{
"name": "www"
}
]
]
}
}
import json
import requests
#
class ApiResponse(object):
statusCode = 0
body = {}
#
def __init__(self, statusCode, body):
self.statusCode = statusCode
self.body = body
#
class Dnsimple(object):
apiKey = ""
userId = ""
headers = {
"Accept":"application/json",
"content-Type":"application/json"
}
#
def __init__(self, apiKey, userId):
self.apiKey = apiKey
self.userId = userId
self.headers["Authorization"] = 'Bearer {}'.format(apiKey)
#
def _url(self, path):
return 'https://api.dnsimple.com/v2/{}/{}'.format(self.userId, path)
#
def _return(self, response):
return ApiResponse(response.status_code, response.json())
#
def _get(self, url):
resp = requests.get(url, headers=self.headers)
return self._return(resp)
#
def _post(self, url, body):
resp = requests.post(url, data=body, headers=self.headers)
return self._return(resp)
#
def _patch(self, url, body):
resp = requests.patch(url, data=body, headers=self.headers)
return self._return(resp)
#
def _delete(self, url):
resp = requests.delete(url, headers=self.headers)
return self._return(resp)
#
def getDomain(self, domainName):
return self._get(self._url('domains/{}'.format(domainName)))
#
def createDomain(self, domainName):
body = {"name":domainName}
return self._post(self._url('domains'), body)
#
# def deleteDomain(self, domainName):
# return self._delete(self._url('domains/{}'.format(domainName)))
#
def getZoneRecords(self, zoneName):
return self._get(self._url('zones/{}/records'.format(zoneName)))
#
def createZoneRecord(self, zoneName, name = None, type = None, content = None, ttl = None, priority = None):
body = {}
if name is not None:
body['name'] = name
if type is not None:
body['type'] = type
if content is not None:
body['content'] = content
if ttl is not None:
body["ttl"] = ttl
if priority is not None:
body["priority"] = priority
return self._post(self._url('zones/{}/records'.format(zoneName)), body)
#
def updateZoneRecord(self, zoneName, recordId, content = None, name = None, ttl = None):
body = {}
if content is not None:
body['content'] = content
if name is not None:
body['name'] = name
if ttl is not None:
body["ttl"] = ttl
return self._patch(self._url('zones/{}/records/{}'.format(zoneName, recordId)), body)
#
class DomainRecordUpdater(object):
dnsimple = None
domainConfigs = None
#
def __init__(self, ipAddress, pwntConfig):
self.dnsimple = Dnsimple(pwntConfig['api']['api_key'], pwntConfig['api']['user_id'])
self.domainConfigs = pwntConfig['domains']
#
def run(self):
for domain, typedRecords in config['domains'].iteritems():
domainIsReady = self.verifyDomain(domain)
if (domainIsReady):
self.reviewZoneRecords(domain, typedRecords)
else:
print('Domain {} is not ready!'.format(domain))
#
def createZoneRecord(self, zoneName, recordType, recordInfo):
ttl = None
if 'ttl' in recordInfo:
ttl = recordInfo['ttl']
priority = None
if 'priority' in recordInfo:
priority = recordInfo['priority']
resp = self.dnsimple.createZoneRecord(zoneName, name=recordInfo['name'], type=recordType,
content=recordInfo['content'], ttl=ttl, priority=priority)
return resp.statusCode == 201
#
def updateZoneRecord(self, zoneName, zoneRecordId):
resp = self.dnsimple.updateZoneRecord(zoneName, zoneRecordId, content=ipAddress)
return resp.statusCode == 200
#
def createDomain(self, domainName):
print('Creating domain record for zone {}.'.format(domainName))
resp = self.dnsimple.createDomain(domainName)
return resp.statusCode == 201
#
def verifyDomain(self, domainName):
print('Checking if zone {} exists.'.format(domainName))
resp = self.dnsimple.getDomain(domainName)
return resp.statusCode == 200 or self.createDomain(domainName)
#
def getRecordsForZone(self, zoneName):
recordMap = {}
resp = self.dnsimple.getZoneRecords(zoneName)
if resp.statusCode == 200:
for record in resp.body['data']:
if record['type'] not in recordMap:
recordMap[record['type']] = {}
recordMap[record['type']][record['name']] = record
return recordMap
#
def reviewZoneRecords(self, zoneName, typedRecords):
remoteRecordMap = self.getRecordsForZone(zoneName)
print('Reviewing zone records for {}.'.format(zoneName))
for type, zoneRecords in typedRecords.iteritems():
print('Reviewing records for type {}.'.format(type))
remoteRecords = {}
if type in remoteRecordMap:
remoteRecords = remoteRecordMap[type]
for zoneRecord in zoneRecords:
if zoneRecord['name'] in remoteRecords:
remoteRecord = remoteRecords[zoneRecord['name']]
if remoteRecord['content'] != ipAddress:
print('Updating record for {}.'.format(zoneRecord['name']))
result = self.updateZoneRecord(zoneName, remoteRecord['id'])
print(' Success > {}.'.format(result))
else:
print('{} is already up to date!'.format(zoneRecord['name']))
pass
else:
print('Creating record for {}.'.format(zoneRecord['name']))
result = self.createZoneRecord(type, zoneRecord)
print(' Success > {}.'.format(result))
pass
#
#
def getIpAddress():
resp = requests.get("https://api.ipify.org/?format=text")
if resp.status_code != 200:
raise EnvironmentError('GET / {}'.format(resp.status_code))
return resp.text
#
config = None
with open('config.json') as configFile:
config = json.load(configFile)
if config is None:
exit(-1)
ipAddress = getIpAddress()
updater = DomainRecordUpdater(ipAddress, config)
updater.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment