Last active
April 9, 2023 22:34
-
-
Save synio-wesley/171f1716f487126568f168a399740e20 to your computer and use it in GitHub Desktop.
Sync CloudFlare DNS based on JSON file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a simple Python 3 script to sync all CloudFlare DNS records based on a JSON file. | |
Basically: | |
- It adds new domains if they aren't in CloudFlare yet | |
- For every domain: new DNS records will be added, existing records will be updated and records not existing in the JSON will be deleted | |
By default the script only does a dry run (simulation). Run the script with parameter "execute" to execute it. | |
If you only want to add/update some domains, fill in the "only" key in the JSON file with an array of domains you want to add/update. | |
It can save time if you have a lot of domains in the JSON file. | |
All common record types should be supported (A, AAAA, CNAME, MX, TXT/SPF, SRV). | |
You can create DNS templates for your records, and also define overrides/overwrites based on those templates for some domains. | |
An example JSON can be viewed here: https://gist.github.com/synio-wesley/ffd2ceb0f5abec10e9b865f6420ca804 | |
A few small limitations currently: | |
- Filename needs to be domains.json in same directory (can of course be modified easily to use a parameter if you want) | |
- SPF records need to be defined as SPF instead of TXT in the JSON | |
- Records that have same type and name (for example multiple MX records) will all be updated simultaneously if one is changed | |
- Records that have same type and name (for example multiple MX records) need to be sorted on priority and/or value in the JSON file, otherwise they will be updated all the time | |
""" | |
CF_EMAIL = "?" | |
CF_APIKEY = "?" | |
CF_ACCOUNTID = "?" | |
CF_API_URL = "https://api.cloudflare.com/client/v4/" | |
CF_API_HEADERS = { | |
'X-Auth-Email': CF_EMAIL, | |
'X-Auth-Key': CF_APIKEY, | |
'Content-Type': 'application/json' | |
} | |
import json | |
import requests | |
import sys | |
from copy import deepcopy | |
from itertools import zip_longest | |
class DictDiffer(object): | |
""" | |
Calculate the difference between two dictionaries as: | |
(1) items added | |
(2) items removed | |
(3) keys same in both but changed values | |
(4) keys same in both and unchanged values | |
""" | |
def __init__(self, current_dict, past_dict): | |
self.current_dict, self.past_dict = current_dict, past_dict | |
self.set_current, self.set_past = set(current_dict.keys()), set(past_dict.keys()) | |
self.intersect = self.set_current.intersection(self.set_past) | |
def added(self): | |
return self.set_current - self.intersect | |
def removed(self): | |
return self.set_past - self.intersect | |
def changed(self): | |
return set(o for o in self.intersect if self.past_dict[o] != self.current_dict[o]) | |
def unchanged(self): | |
return set(o for o in self.intersect if self.past_dict[o] == self.current_dict[o]) | |
def pagedGet(path): | |
page = 1 | |
hasMorePages = True | |
dataResult = [] | |
while hasMorePages: | |
response = requests.get(CF_API_URL + path + "?page=" + str(page), headers=CF_API_HEADERS) | |
if response.status_code == 200: | |
data = json.loads(response.content.decode('utf-8')) | |
hasMorePages = data["result_info"]["per_page"] * page < data["result_info"]["total_count"] | |
page += 1 | |
dataResult.extend(data["result"]) | |
else: | |
hasMorePages = False | |
return dataResult | |
def getCfDomains(): | |
domains = None | |
dataResult = pagedGet('zones') | |
if dataResult: | |
domains = {d["name"]: d["id"] for d in dataResult} | |
#print(json.dumps(domains, indent=4, sort_keys=True)) | |
return domains | |
def getJsonDomains(): | |
if len(jsonData["only"]) > 0: | |
return [domain for domain in jsonData["domains"].keys() if domain in jsonData["only"]] | |
return list(jsonData["domains"].keys()) | |
def addCfDomain(domain): | |
print("Adding " + domain + "...") | |
if SIMULATE: | |
print("Just simulating...") | |
return True | |
sys.stdout.flush() | |
data = { | |
'name': domain, | |
'account': { | |
'id': CF_ACCOUNTID | |
}, | |
} | |
response = requests.post(CF_API_URL + 'zones', headers=CF_API_HEADERS, data=json.dumps(data)) | |
if response.status_code == 200: | |
data = json.loads(response.content.decode('utf-8')) | |
return data['success'] | |
def transformCfValue(record): | |
if record["type"] == 'MX': | |
return [record["content"].lower(), record["priority"]] | |
elif record["type"] == 'SRV': | |
return [record["content"].replace("\t", " ").lower(), record["priority"]] | |
return record["content"] | |
def transformCfType(record): | |
if record["type"] == "TXT" and record["content"].lower().startswith("v=spf1"): | |
return "SPF" | |
return record["type"] | |
def getCfRecords(domain, id): | |
data = None | |
data2 = None | |
print("Retrieving CF DNS records for " + domain + "...") | |
sys.stdout.flush() | |
dataResult = pagedGet('zones/' + id + '/dns_records') | |
if dataResult != None: | |
#print(json.dumps(data["result"], indent=4, sort_keys=True)) | |
data = {} | |
data2 = {} | |
for d in dataResult: | |
key = transformCfType(d) + "/" + d["name"] | |
if key not in data: | |
data[key] = [] | |
data[key].append({ | |
'type': transformCfType(d), | |
'name': d["name"], | |
'value': transformCfValue(d), | |
'ttl': d["ttl"], | |
}) | |
if key not in data2: | |
data2[key] = [] | |
data2[key].append({ | |
'id': d["id"], | |
'data': d | |
}) | |
# data = {(transformCfType(d) + "/" + d["name"]): { | |
# 'type': transformCfType(d), | |
# 'name': d["name"], | |
# 'value': transformCfValue(d), | |
# 'ttl': d["ttl"], | |
# } for d in dataResult} | |
# data2 = {(transformCfType(d) + "/" + d["name"]): { | |
# 'id': d["id"], | |
# 'data': d | |
# } for d in dataResult} | |
return (data, data2) | |
def getJsonRecords(domain): | |
records = [] | |
config = jsonData["domains"][domain] | |
if "template" in config: | |
records.extend(deepcopy(jsonData["templates"][config["template"]]["records"])) | |
if "remove" in config: | |
for key in config["remove"]: | |
keyParts = key.split("/") | |
type = keyParts[0] | |
name = keyParts[1] | |
records = [r for r in records if r["type"] != type or r["name"] != name] | |
if "overwrites" in config: | |
for overwriteKey in config["overwrites"]: | |
overwrite = jsonData["overwrites"][overwriteKey] | |
if "remove" in overwrite: | |
for remove in overwrite["remove"]: | |
records = [r for r in records if r["type"] != remove] | |
if "add" in overwrite: | |
records.extend(deepcopy(overwrite["add"])) | |
if "add" in config: | |
records.extend(deepcopy(config["add"])) | |
for r in records: | |
r["name"] = r["name"].replace("@", "") | |
r["name"] += domain if len(r["name"]) == 0 else ("." + domain) | |
if r["type"] == "A" or r["type"] == "CNAME": | |
r["value"] = r["value"].replace("@", domain).lower() | |
elif r["type"] == "MX" or r["type"] == "SRV": | |
r["value"][0] = r["value"][0].replace("@", domain).lower() | |
#records = {(r["type"] + "/" + r["name"]): r for r in records} | |
data = {} | |
for r in records: | |
key = r["type"] + "/" + r["name"] | |
if key not in data: | |
data[key] = [] | |
data[key].append(r) | |
return data | |
def updateCfRecord(zoneId, recordId, record, domain): | |
type = record['type'] | |
if type == 'SPF': | |
type = 'TXT' | |
name = record['name'] | |
content = record['value'] | |
priority = None | |
if type == 'MX' or type == 'SRV': | |
content = record['value'][0] | |
priority = record['value'][1] | |
ttl = record['ttl'] | |
proxied = False | |
print("Updating " + domain + ": " + record['type'] + '/' + record['name'] + "...") | |
sys.stdout.flush() | |
if SIMULATE: | |
print("Just simulating...") | |
return True | |
if type == 'SRV': | |
nameParts = name.split('.') | |
service = nameParts[0] | |
proto = nameParts[1] | |
name = '.'.join(nameParts[2:]) | |
contentParts = content.split(' ') | |
weight = contentParts[0] | |
port = contentParts[1] | |
target = contentParts[2] | |
data = { | |
'type': type, | |
'data': { | |
'name': name, | |
'weight': int(weight), | |
'priority': int(priority), | |
'target': target, | |
'service': service, | |
'proto': proto, | |
'port': int(port), | |
'ttl': ttl | |
}, | |
'ttl': ttl | |
} | |
else: | |
data = { | |
'type': type, | |
'name': name, | |
'content': content, | |
'ttl': ttl, | |
'proxied': proxied | |
} | |
if priority: | |
data['priority'] = priority | |
response = requests.put(CF_API_URL + 'zones/' + zoneId + '/dns_records/' + recordId, headers=CF_API_HEADERS, data=json.dumps(data)) | |
if response.status_code == 200: | |
data = json.loads(response.content.decode('utf-8')) | |
return data['success'] | |
def addCfRecord(zoneId, record, domain): | |
type = record['type'] | |
if type == 'SPF': | |
type = 'TXT' | |
name = record['name'] | |
content = record['value'] | |
priority = None | |
if type == 'MX' or type == 'SRV': | |
content = record['value'][0] | |
priority = record['value'][1] | |
ttl = record['ttl'] | |
proxied = False | |
print("Adding " + domain + ": " + record['type'] + '/' + record['name'] + "...") | |
sys.stdout.flush() | |
if SIMULATE: | |
print("Just simulating...") | |
return True | |
if type == 'SRV': | |
nameParts = name.split('.') | |
service = nameParts[0] | |
proto = nameParts[1] | |
name = '.'.join(nameParts[2:]) | |
contentParts = content.split(' ') | |
weight = contentParts[0] | |
port = contentParts[1] | |
target = contentParts[2] | |
data = { | |
'type': type, | |
'data': { | |
'name': name, | |
'weight': int(weight), | |
'priority': int(priority), | |
'target': target, | |
'service': service, | |
'proto': proto, | |
'port': int(port), | |
'ttl': ttl | |
}, | |
'ttl': ttl | |
} | |
else: | |
data = { | |
'type': type, | |
'name': name, | |
'content': content, | |
'ttl': ttl, | |
'proxied': proxied | |
} | |
if priority: | |
data['priority'] = priority | |
response = requests.post(CF_API_URL + 'zones/' + zoneId + '/dns_records', headers=CF_API_HEADERS, data=json.dumps(data)) | |
if response.status_code == 200: | |
data = json.loads(response.content.decode('utf-8')) | |
return data['success'] | |
def deleteCfRecord(zoneId, recordId, record, domain): | |
print("Deleting " + domain + ": " + record['type'] + '/' + record['name'] + "...") | |
sys.stdout.flush() | |
if SIMULATE: | |
print("Just simulating...") | |
return True | |
response = requests.delete(CF_API_URL + 'zones/' + zoneId + '/dns_records/' + recordId, headers=CF_API_HEADERS) | |
if response.status_code == 200: | |
data = json.loads(response.content.decode('utf-8')) | |
return data['success'] | |
SIMULATE = True | |
if len(sys.argv) == 2: | |
if sys.argv[1] == 'execute': | |
SIMULATE = False | |
with open('domains.json') as f: | |
jsonData = json.load(f) | |
cfDomains = getCfDomains() | |
jsonDomains = getJsonDomains() | |
if cfDomains and jsonDomains: | |
# Add missing domains to CF | |
missingDomains = list(set(jsonDomains) - set(cfDomains)) | |
for domain in missingDomains: | |
ok = addCfDomain(domain) | |
print("OK." if ok else "Failed.") | |
sys.stdout.flush() | |
cfDomains = getCfDomains() # Update domain list | |
# Update DNS records | |
for domain in jsonDomains: | |
cfRecords = None | |
if domain in cfDomains: | |
cfRecords, cfRecordIds = getCfRecords(domain, cfDomains[domain]) | |
jsonRecords = getJsonRecords(domain) | |
#print("cf records", cfRecords) | |
#print("json records", jsonRecords) | |
# print("----CF----") | |
# print(json.dumps(cfRecords, indent=4, sort_keys=True)) | |
# print("----CF IDS----") | |
# print(json.dumps(cfRecordIds, indent=4, sort_keys=True)) | |
# print("----JSON----") | |
# print(json.dumps(jsonRecords, indent=4, sort_keys=True)) | |
if cfRecords != None and jsonRecords != None: | |
d = DictDiffer(jsonRecords, cfRecords) | |
deletes = d.removed() | |
for rkey in deletes: | |
recordList = cfRecords[rkey] | |
recordIdList = cfRecordIds[rkey] | |
zoneId = cfDomains[domain] | |
for record, recordId in zip(recordList, recordIdList): | |
recordId = recordId["id"] | |
ok = deleteCfRecord(zoneId, recordId, record, domain) | |
print("OK." if ok else "Failed.") | |
sys.stdout.flush() | |
updates = d.changed() | |
for rkey in updates: | |
recordList = jsonRecords[rkey] | |
recordIdList = cfRecordIds[rkey] | |
zoneId = cfDomains[domain] | |
for record, recordId in zip_longest(recordList, recordIdList, fillvalue=None): | |
if recordId == None: | |
ok = addCfRecord(zoneId, record, domain) | |
print("OK." if ok else "Failed.") | |
sys.stdout.flush() | |
else: | |
recordId = recordId["id"] | |
ok = updateCfRecord(zoneId, recordId, record, domain) | |
print("OK." if ok else "Failed.") | |
sys.stdout.flush() | |
adds = d.added() | |
for rkey in adds: | |
recordList = jsonRecords[rkey] | |
zoneId = cfDomains[domain] | |
for record in recordList: | |
ok = addCfRecord(zoneId, record, domain) | |
print("OK." if ok else "Failed.") | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment