Last active
November 4, 2023 09:51
-
-
Save zajdee/8424ce31bc6addae2316ecba64c2960d to your computer and use it in GitHub Desktop.
A simple Python daemon waiting for pref64
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Module to assist with capturing pref64 from RAs on the IPv6-mostly networks and acting upon it.""" | |
import argparse | |
import ipaddress | |
import logging | |
import re | |
import signal | |
import struct | |
import socket | |
import subprocess | |
from datetime import datetime, timedelta | |
from pprint import pprint | |
from scapy.layers.inet6 import ICMPv6NDOptPrefixInfo, ICMPv6ND_RA, ICMPv6NDOptPREF64 | |
""" | |
You need the latest scapy. The following commands should help you to get this script up and running in Ubuntu 22.04. | |
apt install python3-pip | |
pip3 install argparse | |
pip3 uninstall scapy | |
pip3 -vv install git+https://github.com/secdev/scapy.git@0474c37bf1d147c969173d52ab3ac76d2404d981 | |
python3 pref64_ra_daemon.py | |
Next steps are probably to replicate clatd from Tore Anderson (https://github.com/toreanderson/clatd), e.g.: | |
1. Discover the prefix (already done from the RAs) | |
2. Discover the device facing the PLAT (ip --json -6 route get fd00:64:: | jq) | |
[ | |
{ | |
"dst": "fd00:64::", | |
"from": "::", | |
"gateway": "fe80::e859:9eff:fe11:2dbd", | |
"dev": "enp1s0", | |
"protocol": "ra", | |
"prefsrc": "2a03:c22:fe3b:64:b2a:1c35:c99f:2c2c", <--- tohle se může změnit, pokud se změní preferovaná lokální adresa | |
"metric": 100, | |
"flags": [], | |
"pref": "high" | |
} | |
] | |
3. Derive a CLAT IPv6 address from the IPv6 address on the discovered interface | |
4. Check if IPv4 connectivity is present - if not, proceed | |
5. Enable IPv6 forwarding | |
6. Enable Proxy-ND | |
7. Create the `clat` tun device | |
8. Add a default route via the `clat` device | |
9. Prepare a Tayga config and Start it up | |
# cat /tmp/D8W3twOorY | |
tun-device clat | |
prefix 64:ff9b::/96 | |
ipv4-addr 192.0.0.2 | |
map 192.0.0.1 2001:db8:0:64:40da:c593:6378:0 | |
10. Periodically verify if the pref64 is still valid | |
10.1. if not anymore (e.g. it has expired), tear down Tayga and remove the default IPv4 route | |
10.2. if the prefix has changed, tear down and start Tayga again | |
11. If the process is about to shut down, tear Tayga down too | |
TBD: How to listen on all interfaces that are up? (Do we even want this? Or do we want to listen just on the interfaces with an IPv6 connectivity?) | |
""" | |
IP_ADDR_MATCHER = re.compile( | |
r"inet6 (?P<address>[0-9a-f:]+)/(?P<netmask>[0-9]{1,3}) .*? " | |
r"preferred_lft (?P<preferred>(forever|\d+sec))", | |
re.DOTALL | re.MULTILINE | re.I, | |
) | |
IPV6_ALLNODES_MCAST = "ff02::1" | |
ICMPV6_TYPE_RA = 134 | |
class Terminating(Exception): | |
"""Helper class to terminate the application correctly.""" | |
class StaticMAddrDaemon: | |
"""Main class of the tool.""" | |
iface = None | |
def __init__(self): | |
signal.signal(signal.SIGINT, self.sigterm) | |
signal.signal(signal.SIGTERM, self.sigterm) | |
self.logging_init() | |
def act_on_ra(self, packet): | |
"""Act upon RA reception.""" | |
logging.debug("=== captured packet START ===") | |
icmpv6 = packet[ICMPv6ND_RA] | |
prefixes = [] | |
for payload in icmpv6.iterpayloads(): | |
# only process the ICMPv6NDOptPrefixInfo payload | |
if not isinstance(payload, ICMPv6NDOptPREF64): | |
continue | |
prefix = self.process_pref64(payload[ICMPv6NDOptPREF64]) | |
print(f"Captured a pref64: {prefix}") | |
prefixes.append(prefix) | |
continue | |
if len(prefixes) > 0: | |
print(f"Captured pref64s: {prefixes}") | |
logging.debug("=== captured packet END ===") | |
def process_pref64(self, payload): | |
"""Decode the captured pref64 data.""" | |
lifetime = payload.scaledlifetime * 8 | |
netmask = ICMPv6NDOptPREF64.plc.i2s[payload.plc] | |
expiry = datetime.now() + timedelta(seconds=lifetime) | |
return { | |
"expiry": expiry, | |
"pref64": f"{payload.prefix}{netmask}", | |
} | |
def decode_ra(self, data): | |
"""Decode the capatured RA.""" | |
if len(data) < 1: | |
return None | |
if data[0] != ICMPV6_TYPE_RA: | |
# Not the RA we're looking for | |
return None | |
try: | |
advertisement = ICMPv6ND_RA(data) | |
except Exception: | |
advertisement = None | |
return advertisement | |
def receiver(self): | |
"""Multicast packet receiver function.""" | |
logging.debug("start for iface %s", self.iface) | |
# Look up multicast group address in name server and find out IP version | |
addrinfo = socket.getaddrinfo(IPV6_ALLNODES_MCAST, None)[0] | |
# Create a socket | |
mcast_socket = socket.socket( | |
addrinfo[0], socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp") | |
) | |
mcast_socket.setsockopt( | |
socket.SOL_SOCKET, 25, str(self.iface + "\0").encode("utf-8") | |
) | |
# Allow multiple copies of this program on one machine | |
mcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0]) | |
# Join group | |
mreq = group_bin + struct.pack("@I", 0) | |
mcast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) | |
# Loop, processing data we receive | |
try: | |
while True: | |
data, _ = mcast_socket.recvfrom(1500) | |
advertisement = self.decode_ra(data) | |
if not advertisement: | |
continue | |
self.act_on_ra(advertisement) | |
except Terminating: | |
logging.warning("Terminated, exitting gracefully...") | |
def launch_ip(self, iface): | |
"""Launches the `ip` command""" | |
cmd = f"ip -6 address show dev {iface} scope global" | |
result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, check=False) | |
return result.stdout.decode("utf-8") | |
def logging_init(self): | |
"""Initialize logging.""" | |
logging.basicConfig( | |
level=logging.DEBUG, | |
datefmt="%Y-%m-%d %H:%M:%S", | |
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - " | |
"%(funcName)s: %(message)s", | |
) | |
def parse_args(self): | |
"""Parse arguments.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-i", "--interface", type=str, required=True) | |
args = parser.parse_args() | |
self.iface = args.interface | |
return args | |
@staticmethod | |
def sigterm(signal_number, stack_frame): | |
"""Terminate gracefully on a signal.""" | |
raise Terminating() | |
def main(): | |
"""Main function. What else?""" | |
daemon = StaticMAddrDaemon() | |
daemon.parse_args() | |
daemon.receiver() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment