Created
May 3, 2022 13:35
-
-
Save mentha/1b9a4a9afffe0ce7de2ba034226bb624 to your computer and use it in GitHub Desktop.
assign static ip and network filters to libvirt guests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from pprint import pprint | |
from argparse import ArgumentParser | |
from ipaddress import IPv4Address | |
from tempfile import NamedTemporaryFile | |
import libvirt | |
import os | |
import xml.etree.ElementTree as ET | |
PROGNAME = 'libvirt-assign-ip' | |
def strlist(a): | |
return list(filter(None, a.split(','))) | |
parser = ArgumentParser(description='Assign static IPv4 addresses and network filters to managed libvirt virtual machines.') | |
parser.add_argument('--connect', '-c', metavar='URI', dest='uri', default=None, action='store', help='Connect to specified URI') | |
parser.add_argument('--dry-run', '-d', dest='dry', action='store_true', help='Do not change anything') | |
parser.add_argument('--reset', '-r', dest='reset', action='store_true', help='Reset all addresses') | |
parser.add_argument('--hosts', '-o', metavar='HOSTS', dest='hosts', action='store', help='Path to hosts file to update, if specified') | |
parser.add_argument('--network', '-n', metavar='NET', dest='nets', default=[], action='extend', type=strlist, help='Manage specified networks') | |
parser.add_argument('--no-network', metavar='NET', dest='nonets', default=[], action='extend', type=strlist, help='Skip specified networks') | |
a = parser.parse_args() | |
conn = libvirt.open(a.uri) | |
nets = {} | |
netdoms = set() | |
netxml = {} | |
subnets = {} | |
mac2net = {} | |
allocated = {} | |
for n in conn.listAllNetworks(libvirt.VIR_CONNECT_LIST_NETWORKS_PERSISTENT): | |
nn = n.name() | |
if nn in a.nonets or (a.nets and nn not in a.nets): | |
continue | |
xn = ET.fromstring(n.XMLDesc()) | |
netxml[nn] = xn | |
if xn.find('domain[@name]') is None: | |
continue | |
netdoms.add(xn.find('domain').get('name')) | |
subnet = None | |
for ip in xn.findall('ip'): | |
if ip.get('family', 'ipv4') != 'ipv4': | |
continue | |
if ip.find('dhcp/range') is None: | |
continue | |
subnet = ip | |
break | |
if subnet: | |
subnets[nn] = subnet | |
allocated[nn] = set() | |
if not a.reset: | |
for h in subnet.findall('dhcp/host'): | |
mac2net[h.get('mac').lower()] = n | |
allocated[nn].add(IPv4Address(h.get('ip'))) | |
nets[nn] = n | |
doms = [] | |
domxml = {} | |
mac2dom = {} | |
for d in conn.listAllDomains(libvirt.VIR_CONNECT_LIST_DOMAINS_PERSISTENT): | |
found = False | |
xd = ET.fromstring(d.XMLDesc()) | |
domxml[d.name()] = xd | |
for intf in xd.findall('devices/interface[@type="network"]'): | |
if intf.find('source').get('network') not in subnets: | |
continue | |
mac2dom[intf.find('mac').get('address').lower()] = d | |
found = True | |
if found: | |
doms.append(d) | |
# clear old net host ent | |
def update_net(net, *a): | |
try: | |
net.update(*a, libvirt.VIR_NETWORK_UPDATE_AFFECT_LIVE | libvirt.VIR_NETWORK_UPDATE_AFFECT_CONFIG) | |
except libvirt.libvirtError: | |
net.update(*a, libvirt.VIR_NETWORK_UPDATE_AFFECT_CONFIG) | |
for n in nets.values(): | |
dhcp = subnets[n.name()].find('dhcp') | |
rl = [] | |
for h in dhcp.findall('host'): | |
if a.reset or h.get('mac').lower() not in mac2dom: | |
rl.append(h) | |
for h in rl: | |
print(f'Removing unused host entry ({h.get("ip")}, {h.get("mac")}, "{h.get("name")}") from {n.name()}') | |
dhcp.remove(h) | |
if not a.reset: | |
allocated[n.name()].remove(IPv4Address(h.get('ip'))) | |
if not a.dry: | |
update_net(n, | |
libvirt.VIR_NETWORK_UPDATE_COMMAND_DELETE, libvirt.VIR_NETWORK_SECTION_IP_DHCP_HOST, -1, | |
ET.tostring(h, encoding='unicode')) | |
# assign ips | |
def update_domdev(dom, *a): | |
try: | |
dom.updateDeviceFlags(*a, libvirt.VIR_DOMAIN_DEVICE_MODIFY_LIVE | libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG) | |
except libvirt.libvirtError: | |
dom.updateDeviceFlags(*a, libvirt.VIR_DOMAIN_DEVICE_MODIFY_CONFIG) | |
for d in doms: | |
xd = domxml[d.name()] | |
for intf in xd.findall('devices/interface[@type="network"]'): | |
netname = intf.find('source').get('network') | |
if netname not in subnets: | |
continue | |
subnet = subnets[netname] | |
mac = intf.find('mac').get('address').lower() | |
ip = None | |
if mac in mac2net: | |
h = subnet.find(f'dhcp/host[@mac="{mac}"]') | |
ip = IPv4Address(h.get('ip')) | |
h.set('name', d.name()) | |
else: | |
ip = IPv4Address(subnet.get('address')) + 1 | |
high = IPv4Address(subnet.find('dhcp/range').get('end')) | |
while ip <= high: | |
if ip not in allocated[netname]: | |
break | |
ip += 1 | |
allocated[netname].add(ip) | |
h = ET.Element('host') | |
h.set('mac', mac) | |
h.set('name', d.name()) | |
h.set('ip', ip.compressed) | |
subnet.find('dhcp').append(h) | |
n = nets[netname] | |
print(f'Adding host entry ({h.get("ip")}, {h.get("mac")}, "{h.get("name")}") to {n.name()}') | |
if not a.dry: | |
update_net(n, | |
libvirt.VIR_NETWORK_UPDATE_COMMAND_ADD_LAST, libvirt.VIR_NETWORK_SECTION_IP_DHCP_HOST, -1, | |
ET.tostring(h, encoding='unicode')) | |
oldf = list(intf.findall('filterref[@filter="clean-traffic"]')) | |
filtparm = ( | |
('CTRL_IP_LEARNING', 'none'), | |
('IP', ip.compressed)) | |
if len(oldf) == 1: | |
f = oldf[0] | |
if len(f.findall('parameter')) == len(filtparm): | |
nomod = True | |
for n, v in filtparm: | |
p = f.find(f'parameter[@name="{n}"]') | |
if p is None or p.get('value') != v: | |
nomod = False | |
break | |
if nomod: | |
continue | |
for f in oldf: | |
intf.remove(f) | |
f = ET.Element('filterref') | |
f.set('filter', 'clean-traffic') | |
for n, v in filtparm: | |
p = ET.Element('parameter') | |
p.set('name', n) | |
p.set('value', v) | |
f.append(p) | |
intf.append(f) | |
print(f'Configuring network filter of "{d.name()}" ({mac})') | |
if not a.dry: | |
update_domdev(d, ET.tostring(intf, encoding='unicode')) | |
# modify hosts | |
if a.hosts: | |
c_begin = f'#BEGIN MANAGED HOSTS BY {PROGNAME}' | |
c_end = f'#END MANAGED HOSTS BY {PROGNAME}' | |
print('Updating hosts') | |
f = None | |
if a.dry: | |
class DryWrite: | |
def __init__(self): | |
self.buf = '' | |
def __enter__(self, *a): | |
return self | |
def __exit__(self, *a): | |
self.line(self.buf.strip()) | |
def write(self, a): | |
self.buf += a | |
s = self.buf.split('\n', 1) | |
if len(s) >= 2: | |
self.line(s[0]) | |
self.buf = s[1] | |
def line(self, a): | |
print(f'Written {a.strip()}') | |
f = DryWrite() | |
else: | |
f = NamedTemporaryFile('w', delete=False, dir=os.path.dirname(a.hosts), prefix=f'new-{os.path.basename(a.hosts)}-') | |
delete_f = True | |
try: | |
aliases = {} | |
try: | |
with open(a.hosts) as rf: | |
managed = False | |
for l in rf.readlines(): | |
l = l.strip() | |
if l.startswith(c_begin): | |
managed = True | |
elif l.startswith(c_end): | |
managed = False | |
elif not managed: | |
f.write(l) | |
f.write('\n') | |
else: | |
l = l.split()[1:] | |
extra = set() | |
for n in l: | |
tld = n.rsplit('.', 1)[-1] | |
if tld not in netdoms: | |
extra.add(n) | |
else: | |
aliases[n] = extra | |
except FileNotFoundError: | |
pass | |
f.write(c_begin) | |
f.write('\n') | |
for n in nets.values(): | |
dhcp = subnets[n.name()].find('dhcp') | |
dom = netxml[n.name()].find('domain').get('name') | |
for h in dhcp.findall('host'): | |
hn = h.get('name') + '.' + dom | |
nl = [hn] | |
if hn in aliases: | |
nl.extend(sorted(aliases[hn])) | |
f.write(h.get('ip')) | |
f.write(' ') | |
f.write(' '.join(nl)) | |
f.write('\n') | |
f.write(c_end) | |
f.write('\n') | |
if not a.dry: | |
os.fchmod(f.fileno(), 0o644) | |
os.rename(f.name, a.hosts) | |
delete_f = False | |
finally: | |
if a.dry: | |
print(f'Would have replaced {a.hosts} with new hosts') | |
elif delete_f: | |
os.unlink(f.name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment