Created
August 10, 2023 13:54
-
-
Save wolsen/f6e7c643b0cb44d5ab9a1cd40a0bc8a3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Used to configure VRFs for Juju management services. | |
# | |
import netifaces | |
import ipaddress | |
import subprocess | |
import textwrap | |
import yaml | |
from pathlib import Path | |
from typing import Dict, List | |
from dataclasses import dataclass | |
from collections import defaultdict | |
CIDR = "10.167.81.0/24" | |
VRF_NAME = "vrf21" | |
SYSTEMD_CONF_DIR = Path("/etc/systemd/system") | |
@dataclass | |
class VRFInterfaceInfo: | |
interface: str | |
addresses: List[str] | |
gateways: List[str] | |
def get_vrf_info(target_cidr: ipaddress.IPv4Network) -> List[VRFInterfaceInfo]: | |
"""Finds the interfaces that are in target CIDR network. | |
""" | |
matches = [] | |
gateways = defaultdict(list) | |
default_gateway = None | |
for addr, iface, is_default in netifaces.gateways().get(netifaces.AF_INET, []): | |
if is_default: | |
default_gateway = addr | |
gateways[iface].append(addr) | |
for iface in netifaces.interfaces(): | |
if iface == 'lo': | |
continue | |
addrs = netifaces.ifaddresses(iface) | |
# The interface can exist without an IP address, so make | |
# sure there's actually an IP address assigned. | |
if netifaces.AF_INET not in addrs: | |
continue | |
for addr in addrs: | |
# Get all the addresses for the interface | |
addresses = [ipaddress.IPv4Address(inet_addr['addr']) | |
for inet_addr in addrs[netifaces.AF_INET] | |
if 'addr' in inet_addr] | |
# If any of the addresses are in the target CIDR, then the | |
# interface is in the target CIDR | |
valid_addresses = [addr for addr in filter(lambda a: a in target_cidr, addresses)] | |
if not valid_addresses: | |
continue | |
info = VRFInterfaceInfo(iface, valid_addresses, [default_gateway, *gateways[iface]]) | |
matches.append(info) | |
return matches | |
def get_gateways(interfaces: List[str]) -> Dict[str, str]: | |
"""Retrieves the gateways for the specified interfaces. | |
Returns a mapping of the interface to the gateway that is used for it. | |
If there is no gateway defined for the interface specifically, the default | |
gateway is used (if it exists). If no default gateway is available and | |
the interface doesn't have a specified gateway, this will return a value | |
of None for the interface key. | |
:param interfaces: a list of interfaces to retrieve the gateways for | |
:return: a dict[str, str] where the key is the interface name and the value | |
is the gateway to use for the interface. | |
""" | |
gateways = netifaces.gateways() | |
default_gw = gateways.get('default', None) | |
# Fill the return dict with a mapping to the default_gw | |
iface_to_gw = {iface: default_gw for iface in interfaces} | |
if netifaces.AF_INET not in gateways: | |
return iface_to_gw | |
for addr, iface, is_default in gateways[netifaces.AF_INET]: | |
iface_to_gw[iface] = addr | |
return iface_to_gw | |
def generate_netplan_vrf(infos: List[VRFInterfaceInfo], vrf_name: str): | |
""" | |
:param vrf_info: | |
:return: | |
""" | |
vrf_config = { | |
"network": { | |
"vrfs": { | |
vrf_name: { | |
"table": 21, | |
"interfaces": [info.interface for info in infos], | |
"routes": [ | |
{ | |
"to": "default", | |
"via": info.gateways[0], | |
} for info in infos | |
], | |
"routing-policy": [ | |
[{"from": str(address)} | |
for address in info.addresses] for info in vrf_info | |
][0], | |
} | |
} | |
} | |
} | |
return vrf_config | |
def install_systemd_override(service: str, command: str): | |
"""Installs the systemd override. | |
:param service: | |
:param command: | |
:return: | |
""" | |
override_dir = SYSTEMD_CONF_DIR / f"{service}.service.d" | |
override_dir.mkdir(mode=0o755, parents=True, exist_ok=True) | |
override_file = override_dir / "99-vrf.conf" | |
override_file.write_text(textwrap.dedent( | |
f"""\ | |
[service] | |
ExecStart= | |
ExecStart=ip vrf exec {VRF_NAME} {command} | |
""" | |
)) | |
target_network = ipaddress.IPv4Network(CIDR) | |
vrf_info = get_vrf_info(target_network) | |
# Store the vrf information in its own netplan config | |
netplan_conf = Path("/etc/netplan/90-vrf.yaml") | |
netplan_vrf_data = generate_netplan_vrf(vrf_info, VRF_NAME) | |
netplan_conf.write_text(yaml.dump(netplan_vrf_data)) | |
# Install the necessary systemd overrides to exec the command in the vrf | |
juju_machine_id = Path("/var/lib/juju/nonce.txt").read_text("utf-8").split(':')[0] | |
install_systemd_override(f"jujud-{juju_machine_id}", | |
f"/etc/systemd/system/jujud-{juju_machine_id}-exec-start.sh") | |
install_systemd_override("sshd", "/usr/sbin/sshd -D $SSHD_OPTS") | |
# Apply netplan configuration | |
subprocess.check_call(["netplan", "apply"]) | |
subprocess.check_call(["ip", "link", "set", VRF_NAME, "up"]) | |
# Reload systemctl daemon configs and restart services. | |
subprocess.check_call(["systemctl", "daemon-reload"]) | |
for service in ["sshd", f"jujud-{juju_machine_id}"]: | |
subprocess.check_call(["systemctl", "restart", service]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment