-
-
Save dmtucker/1203ab0b676283385f657bb83531a8c7 to your computer and use it in GitHub Desktop.
Update script for dynv6.com to set your IPv6 address
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
update_via_http () { | |
if [ -z "${token+undefined}" ] || [ "$#" != 2 ] | |
then | |
echo 'usage: token=<your-HTTP-token> update_via_http zone ipv6' 1>&2 | |
exit 1 | |
fi | |
zone="$1" | |
ipv6="$2" | |
# https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=141323 | |
tmp="$(mktemp)" | |
wget -6O- --no-verbose "https://dynv6.com/api/update?zone=$zone&token=$token&ipv6=$ipv6" 2>"$tmp" | |
code="$?" | |
if [ "$code" = 0 ] | |
then echo | |
else cat "$tmp" | |
fi | |
rm "$tmp" | |
return $code | |
} | |
update_via_ssh () { | |
[ "$#" = 2 ] || { | |
echo 'usage: update_via_ssh zone ipv6addr' 1>&2 | |
exit 1 | |
} | |
zone="$1" | |
ipv6addr="$2" | |
ssh -6 [email protected] hosts "$zone" set ipv6addr "$ipv6addr" | |
} | |
if [ -n "${token+undefined}" ] | |
then update_zone=update_via_http | |
else update_zone=update_via_ssh | |
fi | |
errors=0 | |
for zone in "$@" | |
do | |
host_zone="$(host -t AAAA "$zone")" | |
ipv6="$(echo "$host_zone" | awk '/IPv6/ { print $NF }')" | |
if [ "$ipv6" = '' ] | |
then echo "$host_zone" | |
elif ip -6 address show scope global | grep -q "$ipv6" | |
then continue | |
else echo "$(hostname) does not have IPv6 address $ipv6" | |
fi | |
new_ipv6="$(ip -6 address show scope global primary | awk '/inet6/ { print $2 }' | sort | head -n1 | cut -d/ -f1)" | |
[ "$new_ipv6" = '' ] && new_ipv6=auto | |
if "$update_zone" "$zone" "$new_ipv6" | |
then | |
new_host_zone="$host_zone" | |
while [ "$new_host_zone" = "$host_zone" ] | |
do | |
sleep 1 | |
new_host_zone="$(host -t AAAA "$zone")" | |
echo -n . | |
done | |
echo | |
echo "$new_host_zone" | |
else | |
errors=$((errors + 1)) | |
echo | |
ip -6 address show scope global | |
fi | |
done | |
exit $errors |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import argparse | |
import ipaddress | |
import logging | |
import os | |
import socket | |
import subprocess # nosec import_subprocess | |
import sys | |
import typing | |
import urllib.parse | |
import urllib.request | |
LOG_LEVEL = { | |
"CRITICAL": logging.CRITICAL, | |
"FATAL": logging.FATAL, | |
"ERROR": logging.ERROR, | |
"WARN": logging.WARNING, | |
"WARNING": logging.WARNING, | |
"INFO": logging.INFO, | |
"DEBUG": logging.DEBUG, | |
"NOTSET": logging.NOTSET, | |
} | |
def cli() -> argparse.ArgumentParser: | |
"""Get a parser for the command-line interface.""" | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument( | |
"--log-level", | |
help="how much output to produce", | |
type=str.upper, | |
choices=LOG_LEVEL.keys(), | |
default="INFO", | |
) | |
parser.add_argument("zone", help="the zone to update") | |
return parser | |
def unordered_list( | |
iterable: typing.Iterable[typing.Any], /, *, prefix: str = "- " | |
) -> str: | |
"""Format an iterable to be pretty-printed.""" | |
return "\n".join(prefix + str(element) for element in iterable) | |
class Dynv6UpdateZoneError(Exception): | |
"""The dynv6.com zone could not be updated.""" | |
class HostnameResolutionError(Dynv6UpdateZoneError): | |
"""There was a problem resolving a hostname.""" | |
def host_addresses(host: str) -> set[ipaddress.IPv6Address]: | |
"""Resolve a hostname.""" | |
logging.debug(f"Resolving {host}...") | |
try: | |
addresses = { | |
ipaddress.IPv6Address(sockaddr[0]) | |
for family, *_, sockaddr in socket.getaddrinfo(host, None) | |
if family == socket.AF_INET6 | |
} | |
except Exception as exc: | |
logging.error(f"{host}: {exc}") | |
raise HostnameResolutionError from exc | |
logging.debug( | |
f"{host} has the following addresses:\n" + unordered_list(addresses) | |
if addresses | |
else f"{host} has no addresses." | |
) | |
return addresses | |
class AddressRetrievalError(Dynv6UpdateZoneError): | |
"""There was a problem getting local addresses.""" | |
def local_addresses(*, network: ipaddress.IPv6Network) -> set[ipaddress.IPv6Address]: | |
"""Get global IP addresses attached to local network devices.""" | |
network = ipaddress.IPv6Network(network) | |
logging.debug(f"Getting local addresses in {network}...") | |
try: | |
# An attacker that can manipulate PATH or the filesystem | |
# will not gain any extra capabilities from this script, | |
# and network is sanitized by ipaddress.IPv6Network above: | |
output = subprocess.check_output( | |
["ip", "address", "show", "scope", "global", "to", str(network)] | |
) # nosec start_process_with_partial_path, subprocess_without_shell_equals_true | |
except Exception as exc: | |
logging.error(exc) | |
raise AddressRetrievalError from exc | |
addresses = { | |
ipaddress.IPv6Address(line.split()[1].partition(b"/")[0].decode()) | |
for line in output.splitlines() | |
if line.strip().startswith(b"inet6") | |
} | |
logging.debug( | |
f"The following local addresses are in {network}:\n" + unordered_list(addresses) | |
if addresses | |
else f"There are no local addresses in {network}." | |
) | |
return addresses | |
class InvalidAddressError(Dynv6UpdateZoneError): | |
"""The zone resolved to an invalid address.""" | |
def zone_needs_update(zone: str) -> bool: | |
"""Check for local addresses in the zone prefix.""" | |
logging.debug(f"Checking {zone}...") | |
try: | |
zone_ipv6_addresses = { | |
address for address in host_addresses(zone) if address.version == 6 | |
} | |
try: | |
needs_update = not zone_ipv6_addresses or any( | |
not local_addresses(network=ipaddress.IPv6Network(f"{address}/64")) | |
for address in zone_ipv6_addresses | |
) | |
except ValueError as exc: | |
logging.error(f"{zone}: {exc}") | |
raise InvalidAddressError from exc | |
except (HostnameResolutionError, InvalidAddressError, AddressRetrievalError): | |
needs_update = True | |
logging.debug( | |
f"{zone} needs to be updated." | |
if needs_update | |
else f"{zone} does not need to be updated." | |
) | |
return needs_update | |
class ZoneUpdateError(Dynv6UpdateZoneError): | |
"""There was a problem updating the zone.""" | |
class MissingHTTPTokenError(ZoneUpdateError): | |
"""There is no HTTP token available for the dynv6.com API.""" | |
def update_zone(zone: str) -> None: | |
"""Make a request to the dynv6.com HTTP Update API.""" | |
logging.debug(f"Updating {zone}...") | |
try: | |
token = os.environ["token"] | |
except KeyError as exc: | |
logging.error(f"No HTTP Token is set for {zone}.") | |
raise MissingHTTPTokenError from exc | |
url = urllib.parse.urlsplit("https://ipv6.dynv6.com/api/update")._replace( | |
query=urllib.parse.urlencode( | |
{ | |
"ipv6prefix": "auto", | |
"token": token, | |
"zone": zone, | |
} | |
) | |
) | |
try: | |
# The url is manually constructed above, so the scheme is expected: | |
with urllib.request.urlopen(url.geturl()) as response: # nosec urllib_urlopen | |
response_text = response.read().decode( | |
response.headers.get_content_charset() or "utf-8" | |
) | |
except urllib.error.HTTPError as exc: | |
response_text = exc.fp.read().decode( | |
exc.headers.get_content_charset() or "utf-8" | |
) | |
logging.error(f"{zone}: {response_text} ({exc})") | |
raise ZoneUpdateError from exc | |
except Exception as exc: | |
logging.error(f"{url.netloc}: {exc}") | |
raise ZoneUpdateError from exc | |
{ | |
"addresses unchanged": logging.debug, | |
"addresses updated": logging.info, | |
}.get( | |
response_text, logging.warning | |
)(f"{zone}: {response_text} (HTTP Status {response.code}: {response.reason})") | |
def main(argv: list[str] | None = None) -> int: | |
args = cli().parse_args(argv or sys.argv[1:]) | |
logging.basicConfig( | |
format="[%(levelname)s] %(message)s", | |
level=LOG_LEVEL[args.log_level.upper()], | |
) | |
try: | |
if zone_needs_update(args.zone): | |
update_zone(args.zone) | |
except Dynv6UpdateZoneError as exc: | |
logging.error(f"{args.zone} could not be updated.\n{exc}") | |
return 1 | |
logging.debug(f"{args.zone} is up to date.") | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment