Last active
August 29, 2015 13:57
-
-
Save codekoala/9636039 to your computer and use it in GitHub Desktop.
This (python 3) script can be used with Rackspace's DNS servers to point a particular domain's DNS records to the public IP address of the system which invokes the script.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
:mod:`updatedns` --- Update External DNS | |
======================================== | |
.. author:: Josh VanderLinden <[email protected]> | |
.. date:: 18 Mar 2014 | |
This script will attempt to update the Rackspace DNS records for a specific | |
domain to point to the public IP address of the system which invokes this | |
script. | |
In order to reduce delay and API noise, this script relies on redis to cache an | |
authentication token. If no auth token exists in redis, a new one will be | |
retrieved and cached for 12 hours. | |
Once authorized, all DNS entries for the domain in question that have an IP | |
address will be checked against the current system's public IP address. If | |
there is a difference, the records will be updated. If no change is necessary, | |
no further work is done. | |
""" | |
import argparse | |
import ipaddress | |
import json | |
import sys | |
import redis | |
import requests | |
class DNSUpdater: | |
HEADERS = { | |
'Accept': 'application/json', | |
'Content-Type': 'application/json', | |
} | |
DNS_URL = 'https://dns.api.rackspacecloud.com' | |
DOMAIN_URI = '/v1.0/{0.account_id}/domains/?name={0.domain}' | |
RECORD_URI = '/v1.0/{0.account_id}/domains/{0.domain_id}/records' | |
def __init__(self, username, api_key, domain): | |
self.cache = redis.StrictRedis() | |
# make a requests object specifically for this instance | |
self.req = requests.Session() | |
self.req.headers.update(self.HEADERS) | |
self.username = username | |
self.api_key = api_key | |
self.domain = domain | |
self.account_id = None | |
self.domain_id = None | |
@staticmethod | |
def get_public_ip(): | |
""" | |
Attempt to determine the current system's public IP address using | |
ifconfig.me. | |
:returns: | |
The current system's public IP if possible, None otherwise. | |
""" | |
public_ip = None | |
try: | |
sys.stdout.write('Determining public IP address...\n') | |
ip = requests.get('http://ifconfig.me/ip', timeout=5) | |
except: | |
sys.stderr.write('Failed to determine current public IP address\n') | |
else: | |
public_ip = ip.content.strip().decode('ascii') | |
sys.stdout.write('Current public IP: {}\n'.format(public_ip)) | |
return public_ip | |
def get_auth_token(self, key='raxatok'): | |
""" | |
Retrieve an authorization token to interact with Rackspace's API. | |
The token will first be retrieved from the cache. If no such token | |
exists, a new one will be requested, cached, and returned. | |
:param str key: | |
The cache key to use when retrieving/storing the authorization | |
token. | |
:returns: | |
A token string that will be used with subsequent API requests. | |
""" | |
token_json = self.cache.get(key) | |
if token_json is None: | |
auth = self.req.post( | |
'https://identity.api.rackspacecloud.com/v2.0/tokens', | |
data=json.dumps({ | |
'auth': { | |
'RAX-KSKEY:apiKeyCredentials': { | |
'username': self.username, | |
'apiKey': self.api_key | |
} | |
} | |
}) | |
).json() | |
token = auth['access']['token']['id'] | |
self.account_id = auth['access']['token']['tenant']['id'] | |
token_json = json.dumps({ | |
'token': token, | |
'account_id': self.account_id | |
}) | |
self.cache.set(key, token_json, 12 * 3600) | |
else: | |
token_dict = json.loads(token_json.decode('ascii')) | |
token = token_dict['token'] | |
self.account_id = token_dict['account_id'] | |
return token | |
def get_domain(self): | |
""" | |
Return the information about the domain whose records will be updated. | |
:returns: | |
Information for the specified domain. | |
""" | |
url = self.DNS_URL + self.DOMAIN_URI.format(self) | |
data = self.req.get(url).json() | |
return data['domains'][0] | |
def get_records(self): | |
""" | |
Generator for all DNS records that have an IP address. | |
:yields: | |
Any record with an IP address. | |
""" | |
url = self.DNS_URL + self.RECORD_URI.format(self) | |
data = self.req.get(url).json() | |
for record in data['records']: | |
try: | |
ipaddress.ip_address(record['data']) | |
except ValueError: | |
continue | |
else: | |
yield record | |
def update_records(self, ip=None): | |
""" | |
Update the DNS records for the specified domain to point to `ip` or the | |
current system's IP. | |
:param str ip: (optional) | |
An IP address that should be used instead of the current system's | |
public IP address. | |
""" | |
if ip is None: | |
ip = self.get_public_ip() | |
# no point going any further if we don't know what our public IP is | |
if ip is None: | |
sys.exit(1) | |
# get API authorization | |
auth = self.get_auth_token() | |
# throw the auth token into the headers used for later requests | |
self.req.headers['X-Auth-Token'] = auth | |
domain = self.get_domain() | |
self.domain_id = domain['id'] | |
# find any records that need to be updated | |
records = [] | |
for record in self.get_records(): | |
if record['data'] == ip: | |
continue | |
record['data'] = ip | |
records.append(record) | |
if records: | |
url = self.DNS_URL + self.RECORD_URI.format(self) | |
# update those mofos | |
self.req.put(url, data=json.dumps({'records': records})) | |
sys.stdout.write('DNS update to {} pending...\n'.format(ip)) | |
else: | |
sys.stdout.write('No DNS update necessary...\n') | |
if __name__ == '__main__': | |
cli = argparse.ArgumentParser('Personal DNS update script') | |
cli.add_argument('username', help='Rackspace username') | |
cli.add_argument('api_key', help='Rackspace API key') | |
cli.add_argument('domain', help='Domain name whose records will be updated') | |
opts = cli.parse_args() | |
dns = DNSUpdater(opts.username, opts.api_key, opts.domain) | |
dns.update_records() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment