Skip to content

Instantly share code, notes, and snippets.

@dmtucker
Forked from corny/dynv6.sh
Last active August 4, 2024 23:12
Show Gist options
  • Save dmtucker/1203ab0b676283385f657bb83531a8c7 to your computer and use it in GitHub Desktop.
Save dmtucker/1203ab0b676283385f657bb83531a8c7 to your computer and use it in GitHub Desktop.
Update script for dynv6.com to set your IPv6 address
#!/usr/bin/env bash
update_via_http () {
if [ -z "${token+undefined}" ] || [ "$#" != 2 ]
then
echo 'usage: token=<your-HTTP-token> update_via_http zone ipv6' 1>&2
exit 1
fi
zone="$1"
ipv6="$2"
# https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=141323
tmp="$(mktemp)"
wget -6O- --no-verbose "https://dynv6.com/api/update?zone=$zone&token=$token&ipv6=$ipv6" 2>"$tmp"
code="$?"
if [ "$code" = 0 ]
then echo
else cat "$tmp"
fi
rm "$tmp"
return $code
}
update_via_ssh () {
[ "$#" = 2 ] || {
echo 'usage: update_via_ssh zone ipv6addr' 1>&2
exit 1
}
zone="$1"
ipv6addr="$2"
ssh -6 [email protected] hosts "$zone" set ipv6addr "$ipv6addr"
}
if [ -n "${token+undefined}" ]
then update_zone=update_via_http
else update_zone=update_via_ssh
fi
errors=0
for zone in "$@"
do
host_zone="$(host -t AAAA "$zone")"
ipv6="$(echo "$host_zone" | awk '/IPv6/ { print $NF }')"
if [ "$ipv6" = '' ]
then echo "$host_zone"
elif ip -6 address show scope global | grep -q "$ipv6"
then continue
else echo "$(hostname) does not have IPv6 address $ipv6"
fi
new_ipv6="$(ip -6 address show scope global primary | awk '/inet6/ { print $2 }' | sort | head -n1 | cut -d/ -f1)"
[ "$new_ipv6" = '' ] && new_ipv6=auto
if "$update_zone" "$zone" "$new_ipv6"
then
new_host_zone="$host_zone"
while [ "$new_host_zone" = "$host_zone" ]
do
sleep 1
new_host_zone="$(host -t AAAA "$zone")"
echo -n .
done
echo
echo "$new_host_zone"
else
errors=$((errors + 1))
echo
ip -6 address show scope global
fi
done
exit $errors
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import ipaddress
import logging
import os
import socket
import subprocess # nosec import_subprocess
import sys
import typing
import urllib.parse
import urllib.request
LOG_LEVEL = {
"CRITICAL": logging.CRITICAL,
"FATAL": logging.FATAL,
"ERROR": logging.ERROR,
"WARN": logging.WARNING,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"NOTSET": logging.NOTSET,
}
def cli() -> argparse.ArgumentParser:
"""Get a parser for the command-line interface."""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--log-level",
help="how much output to produce",
type=str.upper,
choices=LOG_LEVEL.keys(),
default="INFO",
)
parser.add_argument("zone", help="the zone to update")
return parser
def unordered_list(
iterable: typing.Iterable[typing.Any], /, *, prefix: str = "- "
) -> str:
"""Format an iterable to be pretty-printed."""
return "\n".join(prefix + str(element) for element in iterable)
class Dynv6UpdateZoneError(Exception):
"""The dynv6.com zone could not be updated."""
class HostnameResolutionError(Dynv6UpdateZoneError):
"""There was a problem resolving a hostname."""
def host_addresses(host: str) -> set[ipaddress.IPv6Address]:
"""Resolve a hostname."""
logging.debug(f"Resolving {host}...")
try:
addresses = {
ipaddress.IPv6Address(sockaddr[0])
for family, *_, sockaddr in socket.getaddrinfo(host, None)
if family == socket.AF_INET6
}
except Exception as exc:
logging.error(f"{host}: {exc}")
raise HostnameResolutionError from exc
logging.debug(
f"{host} has the following addresses:\n" + unordered_list(addresses)
if addresses
else f"{host} has no addresses."
)
return addresses
class AddressRetrievalError(Dynv6UpdateZoneError):
"""There was a problem getting local addresses."""
def local_addresses(*, network: ipaddress.IPv6Network) -> set[ipaddress.IPv6Address]:
"""Get global IP addresses attached to local network devices."""
network = ipaddress.IPv6Network(network)
logging.debug(f"Getting local addresses in {network}...")
try:
# An attacker that can manipulate PATH or the filesystem
# will not gain any extra capabilities from this script,
# and network is sanitized by ipaddress.IPv6Network above:
output = subprocess.check_output(
["ip", "address", "show", "scope", "global", "to", str(network)]
) # nosec start_process_with_partial_path, subprocess_without_shell_equals_true
except Exception as exc:
logging.error(exc)
raise AddressRetrievalError from exc
addresses = {
ipaddress.IPv6Address(line.split()[1].partition(b"/")[0].decode())
for line in output.splitlines()
if line.strip().startswith(b"inet6")
}
logging.debug(
f"The following local addresses are in {network}:\n" + unordered_list(addresses)
if addresses
else f"There are no local addresses in {network}."
)
return addresses
class InvalidAddressError(Dynv6UpdateZoneError):
"""The zone resolved to an invalid address."""
def zone_needs_update(zone: str) -> bool:
"""Check for local addresses in the zone prefix."""
logging.debug(f"Checking {zone}...")
try:
zone_ipv6_addresses = {
address for address in host_addresses(zone) if address.version == 6
}
try:
needs_update = not zone_ipv6_addresses or any(
not local_addresses(network=ipaddress.IPv6Network(f"{address}/64"))
for address in zone_ipv6_addresses
)
except ValueError as exc:
logging.error(f"{zone}: {exc}")
raise InvalidAddressError from exc
except (HostnameResolutionError, InvalidAddressError, AddressRetrievalError):
needs_update = True
logging.debug(
f"{zone} needs to be updated."
if needs_update
else f"{zone} does not need to be updated."
)
return needs_update
class ZoneUpdateError(Dynv6UpdateZoneError):
"""There was a problem updating the zone."""
class MissingHTTPTokenError(ZoneUpdateError):
"""There is no HTTP token available for the dynv6.com API."""
def update_zone(zone: str) -> None:
"""Make a request to the dynv6.com HTTP Update API."""
logging.debug(f"Updating {zone}...")
try:
token = os.environ["token"]
except KeyError as exc:
logging.error(f"No HTTP Token is set for {zone}.")
raise MissingHTTPTokenError from exc
url = urllib.parse.urlsplit("https://ipv6.dynv6.com/api/update")._replace(
query=urllib.parse.urlencode(
{
"ipv6prefix": "auto",
"token": token,
"zone": zone,
}
)
)
try:
# The url is manually constructed above, so the scheme is expected:
with urllib.request.urlopen(url.geturl()) as response: # nosec urllib_urlopen
response_text = response.read().decode(
response.headers.get_content_charset() or "utf-8"
)
except urllib.error.HTTPError as exc:
response_text = exc.fp.read().decode(
exc.headers.get_content_charset() or "utf-8"
)
logging.error(f"{zone}: {response_text} ({exc})")
raise ZoneUpdateError from exc
except Exception as exc:
logging.error(f"{url.netloc}: {exc}")
raise ZoneUpdateError from exc
{
"addresses unchanged": logging.debug,
"addresses updated": logging.info,
}.get(
response_text, logging.warning
)(f"{zone}: {response_text} (HTTP Status {response.code}: {response.reason})")
def main(argv: list[str] | None = None) -> int:
args = cli().parse_args(argv or sys.argv[1:])
logging.basicConfig(
format="[%(levelname)s] %(message)s",
level=LOG_LEVEL[args.log_level.upper()],
)
try:
if zone_needs_update(args.zone):
update_zone(args.zone)
except Dynv6UpdateZoneError as exc:
logging.error(f"{args.zone} could not be updated.\n{exc}")
return 1
logging.debug(f"{args.zone} is up to date.")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment