Created
August 24, 2020 18:02
-
-
Save MatteoRagni/d7e231ae8acb73e31f4a8737f198214b to your computer and use it in GitHub Desktop.
Update an existing CloudFlare DNS record with the current Public IP (effectively a DynDNS)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import requests | |
from CloudFlare import CloudFlare | |
import pathlib | |
import logging | |
from logging.handlers import SysLogHandler | |
class Application: | |
r""" | |
Updates a record of CloudFlare DNS, in order to use it as a Dynamic DNS | |
service. | |
There are several attribute in this class and are created incrementally: | |
:attr _token: the token for accessing cloudflare DNS Api v4 | |
:attr _domain: the domain name (zone) | |
:attr _record: the record (name only, no domain) to update as `A` record | |
:attr _current_ip: the current public ip as reported by `ipinfo.io` | |
:attr _cloudflare: the cloudflare API endpoint | |
:attr _zone_id: the current zone id for requests | |
:attr _record_data: the dictionary with all the parameters of the DNS record | |
:attr _record_id: tha DNS record id | |
""" | |
def __init__(self, config_file: str, logger=logging): | |
self._token = None | |
self._domain = None | |
self._record = None | |
self._current_ip = None | |
self._cloudflare = None | |
self._zone_id = None | |
self._record = None | |
self._record_data = None | |
self._record_id = None | |
self._logger = logger | |
self.read_configuration(config_file) | |
self.current_public_ip() | |
self.retrieve_record() | |
if self.should_update(): | |
self.update_record() | |
def read_configuration(self, config_file): | |
""" | |
The configuration file is a json file with the following information: | |
- token: cloaudflare token for DNS update | |
- domain: DNS domain name | |
- record: name of the record (the domain is appended automatically) | |
If the file does not exists raises a ValueError. | |
If one of the required information is not found, raises an AttributeError | |
If the json cannot be prsed will raise a JSONDecodeError | |
:param config_file: the configuration file to load | |
:type config_file: str | |
:raise ValueError: if the configuration file does not exists | |
:raise AttributeError: if the key are missing or the value is incorrect | |
:raise json.JSONDecodeError: if the json is invalid | |
""" | |
config_file = pathlib.Path(config_file) | |
if not config_file.exists(): | |
raise ValueError(f"File {config_file} does not exists") | |
with open(config_file) as fp: | |
data = json.load(fp) | |
for name in ["token", "domain", "record"]: | |
if not name in data: | |
raise AttributeError(f"Missing configuration key: {name}") | |
if not isinstance(data[name], str): | |
raise AttributeError(f"Data configuration at key {name} is not a string: invalid") | |
self._token = data["token"] | |
self._domain = data["domain"] | |
self._record = f"{data['record']}.{self._domain}" | |
self._logger.info(f"Working on {self._record}") | |
def retrieve_record(self): | |
r""" | |
Retrieve the current record from the DNS zone and the requested record | |
:raise ValueError: the current confguration cannot identify one and only one zone and record | |
""" | |
self._cloudflare = CloudFlare(token=self._token) | |
zones = self._cloudflare.zones.get(params={'name': self._domain}) | |
if len(zones) != 1: | |
raise ValueError(f"Too many zones received with name {self._domain}. Aborting") | |
self._zone_id = zones[0]['id'] | |
self._logger.info(f"Domain {self._domain} id found") | |
records = self._cloudflare.zones.dns_records.get(self._zone_id, params={'name': self._record, 'type': 'A'}) | |
if len(records) != 1: | |
raise ValueError(f"Too many records found for {self._record}. Aborting") | |
self._record_data = records[0] | |
self._record_id = self._record_data['id'] | |
self._logger.info(f"Found record :A: {self._record} -> {self._record_data['content']}") | |
def should_update(self): | |
r""" | |
Compare the DNS stored ip with the current one | |
:return: true if the if must be updated | |
:rtype: bool | |
""" | |
current_record_ip = self._record_data['content'] | |
_should_update = current_record_ip != self._current_ip | |
if _should_update: | |
self._logger.warning(f"IP is changed from {self._record_data['content']} to {self._current_ip}: Updating") | |
else: | |
self._logger.info(f"Current public IP {self._current_ip} is still valid") | |
return _should_update | |
def update_record(self): | |
""" | |
Update the current record in the DNS | |
""" | |
record_data = self._cloudflare.zones.dns_records.put(self._zone_id, self._record_id, data={ | |
"name": self._record, | |
"type": "A", | |
"proxied": False, | |
"content": self._current_ip | |
}) | |
if record_data["content"] != self._current_ip: | |
raise ValueError(f"Cannot set the current ip: Unknown error") | |
self._logger.info(f"Updated {self._record} informations") | |
def current_public_ip(self): | |
r""" | |
Request from `http://ipinfo.io/json` the current public ip | |
Sets :attr:`_current_ip` | |
:raise requests.exceptions.ConnectionError: if the site is offline | |
""" | |
self._current_ip = requests.get('http://ipinfo.io/json').json()['ip'] | |
self._logger.info(f"Current public ip is: {self._current_ip}") | |
def main(): | |
import argparse | |
import sys | |
parser = argparse.ArgumentParser(description="Updates CloudFlare DNS record of a specific zone with the current public IP") | |
parser.add_argument("-c", "--config", required=True, help="Configuration file with all the configurations") | |
parser.add_argument("-k", "--cron", action='store_true') | |
args = parser.parse_args() | |
logger = logging.getLogger(sys.argv[0]) | |
logger.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s :: %(name)s :: %(levelname)s :: %(message)s') | |
syshandler_log = SysLogHandler(address="/dev/log") | |
syshandler_log.setFormatter(formatter) | |
logger.addHandler(syshandler_log) | |
if not args.cron: | |
iostream_log = logging.StreamHandler() | |
iostream_log.setFormatter(formatter) | |
logger.addHandler(iostream_log) | |
try: | |
Application(args.config, logger) | |
except Exception as error: | |
import traceback | |
logger.error(error) | |
logger.error(traceback.format_exc()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment