Last active
January 13, 2020 05:18
-
-
Save jkluch/13d73045288176c509b51c5f75e066f9 to your computer and use it in GitHub Desktop.
Cloudflare auto-update any type A records found within the account to your current IP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import json | |
import requests | |
import subprocess | |
from os.path import expanduser | |
HOME = expanduser("~") | |
AUTH_EMAIL = "MODIFY THIS" | |
AUTH_KEY = "MODIFY THIS" | |
API_BASE_URL = "https://api.cloudflare.com/client/v4/" | |
REQUESTS_HEADER = { 'X-Auth-Email': AUTH_EMAIL, 'X-Auth-Key': AUTH_KEY, 'Content-Type': 'application/json'} | |
liveIP = "" | |
recordArray = [] | |
def main(): | |
global liveIP | |
global recordArray | |
liveIP = runIPcheck() | |
domains = getDomains() | |
for domain in domains: | |
recordArray += getDNSRecords(domain) | |
for record in recordArray: | |
updateRecord(record) | |
# Compares your current external IP address with the one stored since the last execution | |
def runIPcheck(): | |
sub = subprocess.Popen(["dig", "+short", "myip.opendns.com", "@resolver1.opendns.com"], stdout=subprocess.PIPE) | |
output, err = sub.communicate() | |
new_ip = output.decode('utf-8').strip() | |
ip = open(HOME + '/cfddns/wan_ip_address', 'w') | |
if ip == new_ip: | |
ip.close() | |
sys.exit() | |
else: | |
ip.close() | |
return new_ip | |
# Returns an array of domains | |
def getDomains(): | |
url = API_BASE_URL + "zones" | |
result = apiCall(url) | |
if result['success'] == True: | |
domains = [] | |
for domain in result['result']: | |
domains.append(DNS_Domain(domain['name'], domain['id'])) | |
return domains | |
else: | |
raise APIFailure("Error getting Domains") | |
#Returns array of domain dns records | |
def getDNSRecords(domain): | |
url = API_BASE_URL + "zones" + "/" + domain._Id + "/" + "dns_records" | |
result = apiCall(url) | |
if result['success'] == True: | |
records = [] | |
for record in result['result']: | |
if record['type'] == "A": | |
records.append(DNS_Record(record['type'], record['name'], record['id'], record['zone_id'], record['content'])) | |
return records | |
else: | |
raise APIFailure("Error getting Domain records") | |
def updateRecord(record): | |
url = API_BASE_URL + "zones" + "/" + record._Did + "/" + "dns_records" + "/" + record._Id | |
data = {'id': record._Id, 'type': record._Type, 'name': record._Name, 'content': liveIP} | |
result = apiCall(url, data) | |
print("the IP we found: {}".format(liveIP)) | |
print("{}".format(result['result'])) | |
if result['success'] == True: | |
if result['result']['content'] != liveIP: | |
APIFailure("we tried to update the record but it stayed different") | |
else: | |
ip = open(HOME + '/cfddns/wan_ip_address', 'w') | |
ip.write(str(liveIP)) | |
ip.close() | |
else: | |
APIFailure("Error updating record") | |
# Makes the api call | |
def apiCall(url, data=""): | |
if data == "": | |
r = requests.get(url, headers=REQUESTS_HEADER) | |
else: | |
r = requests.put(url, headers=REQUESTS_HEADER, json=data) | |
return r.json() | |
# DNS record aka where the ip address is set for an A record | |
class DNS_Record: | |
_Type = "" | |
_Name = "" | |
_Id = "" | |
_Did = "" | |
_Ip = "" | |
# _Did is the domain id of that dns record (its a bad var name) | |
def __init__(self, _Type, _Name, _Id, _Did, _Ip): | |
self._Type = _Type | |
self._Name = _Name | |
self._Id = _Id | |
self._Did = _Did | |
self._Ip = _Ip | |
# Domains aka example.com | |
class DNS_Domain: | |
_Name = "" | |
_Id = "" | |
def __init__(self, _Name, _Id): | |
self._Name = _Name | |
self._Id = _Id | |
class APIFailure(Exception): | |
def __init__(self, value): | |
self.value = value | |
def __str__(self): | |
return repr(self.value) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is my first time writing anything in python so let me know if I did something stupid..