Skip to content

Instantly share code, notes, and snippets.

@douo
Created February 2, 2023 11:55
Show Gist options
  • Save douo/34d7bc74088633431aec2ebc482a8fdd to your computer and use it in GitHub Desktop.
Save douo/34d7bc74088633431aec2ebc482a8fdd to your computer and use it in GitHub Desktop.
Cloudflare DDNS for IPv6 GUA
#!/usr/bin/env python3
import signal
import time
from functools import partial
from itertools import groupby
from operator import itemgetter
from os import getenv as env
from queue import Empty, Queue
from threading import Thread
import requests
from pyroute2 import IPRoute
from pyroute2.netlink import rtnl
ifname = env("IFNAME") # "enp3s0" or "eth0"
token = env("CF_API_TOKEN")
zone_id = env("CF_ZONE_ID")
domain = env("CF_DOMAIN")
record_type = "AAAA"
last_record_content = None
record_id = None
record_ttl = 1
running = True
if record_type == "AAAA":
rtmgrp = rtnl.RTMGRP_IPV6_IFADDR
else:
print(f"Unknown Record Type {record_type}")
exit(-1) # TODO
def loop(queue):
with IPRoute(nlm_generator=True) as ipr:
def handler(signum, frame):
global running
running = False
ipr.close()
signal.signal(signal.SIGINT, handler)
if r := [2]: # ipr.link_lookup(ifname=ifname):
index = r[0]
# poll.register(ipr, select.POLLIN)
ipr.bind(groups=rtmgrp, async_cache=True)
print(f"Watching on {ifname} {index=}")
while running:
for msg in ipr.get():
if msg.get("index") == index and msg["event"] == "RTM_NEWADDR":
attrs = dict(msg["attrs"])
if not (
attrs["IFA_FLAGS"]
& (
rtnl.ifaddrmsg.IFA_F_TENTATIVE
| rtnl.ifaddrmsg.IFA_F_DEPRECATED
)
):
addr = attrs["IFA_ADDRESS"]
privacy = bool(
attrs["IFA_FLAGS"] & rtnl.ifaddrmsg.IFA_F_MANAGETEMPADDR
)
queue.put((addr, privacy))
else:
print(f"{ifname=} Not Found")
def comsume(queue):
while running:
try:
if arrived := queue.get(timeout=3):
print(f"comsume:{arrived=}")
time.sleep(5)
data = [arrived]
try:
while not queue.empty():
data.append(queue.get_nowait())
queue.task_done()
except Empty:
pass
print(f"process:{data=}")
# sort is stable: https://wiki.python.org/moin/HowTo/Sorting/#Sort_Stability_and_Complex_Sorts
groups = groupby(sorted(data, key=itemgetter(1)), itemgetter(1))
data = {k: [i[0] for i in v] for (k, v) in groups}
if non_privacy := data.get(False):
process(non_privacy[-1])
else:
process(data[True][-1])
queue.task_done()
except Empty:
pass
def process(addr):
global last_record_content
print(f"New {addr=} acquired")
if addr == last_record_content:
print(f"Ignore! {last_record_content=} is not change.")
else:
try:
result = update_record(addr, domain, record_type, zone_id, token)
if result["success"]:
print(f"Success! {domain} {record_type} record updated to {addr}")
last_record_content = addr
else:
print(f"Error! {domain} {record_type} record updated to {addr}")
print(result)
except requests.HTTPError as e:
print(e)
def get_record(domain, type_, zone_id, token):
headers = {"Authorization": f"Bearer {token}"}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records?type={type_}&name={domain}"
print(url)
response = requests.get(url, headers=headers)
if response.ok:
return response.json()
elif 4 == int(response.status_code / 100):
return response.json()
else:
response.raise_for_status()
def update_record(addr, domain, type_, zone_id, token) -> bool:
headers = {"Authorization": f"Bearer {token}"}
url = (
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}"
)
response = requests.put(
url,
headers=headers,
json={
"type": type_,
"name": domain,
"content": addr,
"ttl": record_ttl,
},
)
if response.ok:
return response.json()
elif 4 == int(response.status_code / 100):
return response.json()
else:
response.raise_for_status()
while True:
print("Fetching last_record_content...")
try:
record = get_record(domain, record_type, zone_id, token)
if record["success"]:
last_record_content = record["result"][-1]["content"]
record_id = record["result"][-1]["id"]
record_ttl = record["result"][-1]["ttl"]
print(f"{last_record_content=} Fetched.")
break
else:
print(record)
except requests.HTTPError as e:
print(e)
time.sleep(10)
if __name__ == "__main__":
print = partial(print, flush=True) # fix python systemd buffering problem
queue = Queue()
worker = Thread(target=comsume, args=[queue])
worker.start()
loop(queue)
[Unit]
Description=Cloudflare DDNS for IPv6 GUA
After=network.target
[Service]
Type=simple
Environment=IFNAME={PLACEHOLDER}
Environment=CF_API_TOKEN={PLACEHOLDER}
Environment=CF_ZONE_ID={PLACEHOLDER}
Environment=CF_DOMAIN={PLACEHOLDER}
ExecStart={PYTHON BINARY} {ABSOLUTE PATH of ddns.py}
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment