cp ddns.service /etc/systemd/system/ddns.service
sudo systemctl enable ddns.service
sudo systemctl start ddns.service
Ref: https://github.com/fire1ce/DDNS-Cloudflare-Bash#requirements
cp ddns.service /etc/systemd/system/ddns.service
sudo systemctl enable ddns.service
sudo systemctl start ddns.service
Ref: https://github.com/fire1ce/DDNS-Cloudflare-Bash#requirements
#!/usr/bin/env python3 | |
import signal | |
import time | |
from functools import partial | |
from itertools import groupby | |
from operator import itemgetter | |
from os import getenv as env | |
from queue import Empty, Queue | |
from threading import Thread | |
import requests | |
from pyroute2 import IPRoute | |
from pyroute2.netlink import rtnl | |
ifname = env("IFNAME") # "enp3s0" or "eth0" | |
token = env("CF_API_TOKEN") | |
zone_id = env("CF_ZONE_ID") | |
domain = env("CF_DOMAIN") | |
record_type = "AAAA" | |
last_record_content = None | |
record_id = None | |
record_ttl = 1 | |
running = True | |
if record_type == "AAAA": | |
rtmgrp = rtnl.RTMGRP_IPV6_IFADDR | |
else: | |
print(f"Unknown Record Type {record_type}") | |
exit(-1) # TODO | |
def loop(queue): | |
with IPRoute(nlm_generator=True) as ipr: | |
def handler(signum, frame): | |
global running | |
running = False | |
ipr.close() | |
signal.signal(signal.SIGINT, handler) | |
if r := [2]: # ipr.link_lookup(ifname=ifname): | |
index = r[0] | |
# poll.register(ipr, select.POLLIN) | |
ipr.bind(groups=rtmgrp, async_cache=True) | |
print(f"Watching on {ifname} {index=}") | |
while running: | |
for msg in ipr.get(): | |
if msg.get("index") == index and msg["event"] == "RTM_NEWADDR": | |
attrs = dict(msg["attrs"]) | |
if not ( | |
attrs["IFA_FLAGS"] | |
& ( | |
rtnl.ifaddrmsg.IFA_F_TENTATIVE | |
| rtnl.ifaddrmsg.IFA_F_DEPRECATED | |
) | |
): | |
addr = attrs["IFA_ADDRESS"] | |
privacy = bool( | |
attrs["IFA_FLAGS"] & rtnl.ifaddrmsg.IFA_F_MANAGETEMPADDR | |
) | |
queue.put((addr, privacy)) | |
else: | |
print(f"{ifname=} Not Found") | |
def comsume(queue): | |
while running: | |
try: | |
if arrived := queue.get(timeout=3): | |
print(f"comsume:{arrived=}") | |
time.sleep(5) | |
data = [arrived] | |
try: | |
while not queue.empty(): | |
data.append(queue.get_nowait()) | |
queue.task_done() | |
except Empty: | |
pass | |
print(f"process:{data=}") | |
# sort is stable: https://wiki.python.org/moin/HowTo/Sorting/#Sort_Stability_and_Complex_Sorts | |
groups = groupby(sorted(data, key=itemgetter(1)), itemgetter(1)) | |
data = {k: [i[0] for i in v] for (k, v) in groups} | |
if non_privacy := data.get(False): | |
process(non_privacy[-1]) | |
else: | |
process(data[True][-1]) | |
queue.task_done() | |
except Empty: | |
pass | |
def process(addr): | |
global last_record_content | |
print(f"New {addr=} acquired") | |
if addr == last_record_content: | |
print(f"Ignore! {last_record_content=} is not change.") | |
else: | |
try: | |
result = update_record(addr, domain, record_type, zone_id, token) | |
if result["success"]: | |
print(f"Success! {domain} {record_type} record updated to {addr}") | |
last_record_content = addr | |
else: | |
print(f"Error! {domain} {record_type} record updated to {addr}") | |
print(result) | |
except requests.HTTPError as e: | |
print(e) | |
def get_record(domain, type_, zone_id, token): | |
headers = {"Authorization": f"Bearer {token}"} | |
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records?type={type_}&name={domain}" | |
print(url) | |
response = requests.get(url, headers=headers) | |
if response.ok: | |
return response.json() | |
elif 4 == int(response.status_code / 100): | |
return response.json() | |
else: | |
response.raise_for_status() | |
def update_record(addr, domain, type_, zone_id, token) -> bool: | |
headers = {"Authorization": f"Bearer {token}"} | |
url = ( | |
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}" | |
) | |
response = requests.put( | |
url, | |
headers=headers, | |
json={ | |
"type": type_, | |
"name": domain, | |
"content": addr, | |
"ttl": record_ttl, | |
}, | |
) | |
if response.ok: | |
return response.json() | |
elif 4 == int(response.status_code / 100): | |
return response.json() | |
else: | |
response.raise_for_status() | |
while True: | |
print("Fetching last_record_content...") | |
try: | |
record = get_record(domain, record_type, zone_id, token) | |
if record["success"]: | |
last_record_content = record["result"][-1]["content"] | |
record_id = record["result"][-1]["id"] | |
record_ttl = record["result"][-1]["ttl"] | |
print(f"{last_record_content=} Fetched.") | |
break | |
else: | |
print(record) | |
except requests.HTTPError as e: | |
print(e) | |
time.sleep(10) | |
if __name__ == "__main__": | |
print = partial(print, flush=True) # fix python systemd buffering problem | |
queue = Queue() | |
worker = Thread(target=comsume, args=[queue]) | |
worker.start() | |
loop(queue) |
[Unit] | |
Description=Cloudflare DDNS for IPv6 GUA | |
After=network.target | |
[Service] | |
Type=simple | |
Environment=IFNAME={PLACEHOLDER} | |
Environment=CF_API_TOKEN={PLACEHOLDER} | |
Environment=CF_ZONE_ID={PLACEHOLDER} | |
Environment=CF_DOMAIN={PLACEHOLDER} | |
ExecStart={PYTHON BINARY} {ABSOLUTE PATH of ddns.py} | |
Restart=always | |
RestartSec=10 | |
[Install] | |
WantedBy=multi-user.target |