|
import argparse |
|
import json |
|
import os |
|
import smtplib |
|
from email.mime.text import MIMEText |
|
|
|
import requests |
|
from pprint import pprint |
|
|
|
_GANDI_API_URL = 'https://dns.api.gandi.net/api/v5' |
|
PUBLIC_IP_SITES_LOADERS = { |
|
'https://api.ipify.org/?format=json': lambda resp: str(resp.json()['ip']), |
|
'http://ip.42.pl/raw': lambda resp: str(resp.text), |
|
'http://jsonip.com': lambda resp: str(resp.json()['ip']), |
|
'http://httpbin.org/ip': lambda resp: str(resp.json()['origin']).split(',')[0].strip(), |
|
} |
|
|
|
|
|
# Create a session with a base URL to reduce code duplication and parameter passing |
|
class SessionWithUrlBase(requests.Session): |
|
# In Python 3 you could place `url_base` after `*args`, but not in Python 2. |
|
def __init__(self, url_base=None, *args, **kwargs): |
|
super(SessionWithUrlBase, self).__init__(*args, **kwargs) |
|
self.url_base = url_base |
|
|
|
def request(self, method, url, **kwargs): |
|
modified_url = url |
|
if not url.startswith('http'): |
|
modified_url = self.url_base + url |
|
|
|
return super(SessionWithUrlBase, self).request(method, modified_url, **kwargs) |
|
|
|
|
|
def _set_headers(api_key): |
|
return {'X-API-KEY': api_key} |
|
|
|
|
|
def _get_records_for_zone(session, zone): |
|
resp = session.get(zone['zone_records_href']) |
|
resp.raise_for_status() |
|
records = resp.json() |
|
return records |
|
|
|
|
|
def _get_zone(session, domain): |
|
resp = session.get('/zones') |
|
resp.raise_for_status() |
|
zones = resp.json() |
|
zone = next((z for z in zones if z['name'] == domain), None) |
|
if not zone: |
|
raise ValueError("unable to find zone for name '%s'" % domain) |
|
return zone |
|
|
|
|
|
def get_record_for_subdomain(session, domain, subdomain): |
|
zone = _get_zone(session, domain) |
|
records = _get_records_for_zone(session, zone) |
|
record = next((r for r in records if r['rrset_name'] == subdomain), None) |
|
if not record: |
|
raise ValueError("unable to find record for subdomain '%s'" % subdomain) |
|
return record |
|
|
|
|
|
def get_public_ip(): |
|
for site, loader in PUBLIC_IP_SITES_LOADERS.items(): |
|
pprint("getting public IP from '%s'" % site) |
|
resp = requests.get(site) |
|
if resp.status_code >= 400: |
|
pprint("site '%s' returned '%s' trying next site..." % (site, resp.status_code)) |
|
continue |
|
return loader(resp) |
|
else: |
|
raise ValueError("unable to lookup public IP... check settings") |
|
|
|
|
|
def post_record(session, record, data): |
|
resp = session.put(record['rrset_href'], json=data) |
|
resp.raise_for_status() |
|
return resp.json() |
|
|
|
|
|
def ensure_domain_accurate(session, fqdn): |
|
subdomain, *domain = fqdn.rsplit('.', 2) |
|
domain = '.'.join(domain) |
|
record = get_record_for_subdomain(session, domain, subdomain) |
|
record_ip_address = record['rrset_values'][0] |
|
public_ip = get_public_ip() |
|
|
|
if record_ip_address != public_ip: |
|
pprint("record value '%s' != current public IP '%s'" % (record_ip_address, public_ip)) |
|
pprint("updating record to match public IP") |
|
record['rrset_values'] = [public_ip] |
|
pprint(post_record(session, record, record)) |
|
return {'old': record_ip_address, 'new': public_ip, 'domain': fqdn} |
|
return None |
|
|
|
|
|
def send_email(args, body): |
|
msg = MIMEText(json.dumps(body, indent=4, sort_keys=True)) |
|
msg['Subject'] = 'DNS Updated' |
|
msg['From'] = args.email |
|
msg['To'] = args.email |
|
|
|
server = smtplib.SMTP_SSL(args.email_host, args.email_port) |
|
try: |
|
server.login(args.email_username, args.email_password) |
|
server.sendmail(args.email, args.email, msg.as_string()) |
|
finally: |
|
server.quit() |
|
|
|
|
|
def main(args=None): |
|
parser = argparse.ArgumentParser(__name__) |
|
parser.add_argument('-f', '--fqdn', dest='fqdns', action='append', help='FQDN to keep up to date. eg foo.bar.com') |
|
parser.add_argument('-k', '--key', required=True, default=os.getenv('GANDI_API_KEY'), help='Gandi LiveDNS API key') |
|
parser.add_argument('-u', '--url', default=_GANDI_API_URL, help='URL to Gandi API') |
|
parser.add_argument('-e', '--email', help='Optionally add an email to send (Gmail by default)') |
|
parser.add_argument('-eu', '--email-username', help='Email username') |
|
parser.add_argument('-ep', '--email-password', help='Email password') |
|
parser.add_argument('--email-port', default=465, help='Email port') |
|
parser.add_argument('--email-host', default='smtp.gmail.com', help='Email host') |
|
|
|
parsed_args = parser.parse_args(args=args) |
|
gandi_session = SessionWithUrlBase(url_base=parsed_args.url) |
|
gandi_session.headers.update({'X-Api-Key': parsed_args.key}) |
|
|
|
updated_records = [] |
|
for fqdn in parsed_args.fqdns: |
|
updated = ensure_domain_accurate(gandi_session, fqdn) |
|
if updated: |
|
updated_records.append(updated) |
|
if updated_records and parsed_args.email: |
|
send_email(parsed_args, updated_records) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |