Last active
June 22, 2025 20:34
-
-
Save Nexarian/bcf5b3102f264a66648a3a832a600990 to your computer and use it in GitHub Desktop.
Setup Tesla Wireless network routing on a RPi with multiple Wifi connections
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
######### | |
# Command to run | |
# sudo -E env PATH="$PATH" ./.venv/bin/python3 ./setup_routing_nat.py | |
######### | |
# Auto run | |
# sudo vim /etc/NetworkManager/dispatcher.d/99-ip-change | |
######### | |
import ipaddress | |
import os | |
import subprocess | |
import sys | |
from ipaddress import IPv4Address | |
from typing import Dict, Final, List, Optional, TypedDict, cast | |
import iptc | |
from pyroute2 import IPDB, IPRoute | |
### ADD CODE FOR MAPPING OF TEG interface to virtual IP | |
DESINATION_GATEWAY: Final[str] = str(IPv4Address('192.168.91.1')) | |
TEG_SUBNET_BASE: Final[str] = str(IPv4Address('192.168.91.0')) | |
TEG_SUBNET_CIDR: Final[str] = f"{TEG_SUBNET_BASE}/24" | |
VIRTUAL_IP_PREAMBLE: Final[str] = "192.168.92." | |
BASE_TABLE_ID: Final[int] = 100 | |
BASE_IP: Final[int] = 100 | |
KNOWN_SSIDS: Final[List[str]] = ["TEG-1JG", "TEG-2N1"] | |
SSID_IP_MAPPING_OVERRIDE: Final[Dict[str, IPv4Address]] = { | |
ssid: IPv4Address(f"{VIRTUAL_IP_PREAMBLE}{BASE_IP + i}") | |
for i, ssid in enumerate(KNOWN_SSIDS) | |
} | |
class TEG_Wifi_Interface(TypedDict): | |
iface: str | |
table: int | |
ip: IPv4Address | |
virt_ip: IPv4Address | |
ssid: str | |
def check_root() -> None: | |
if os.geteuid() == 0: | |
return | |
print("This script must be run as root") | |
sys.exit(1) | |
def find_matching_wireless_interfaces() -> List[TEG_Wifi_Interface]: | |
def is_wireless(ifname): | |
# Check if wireless by looking for /sys/class/net/{ifname}/wireless | |
return os.path.isdir(f'/sys/class/net/{ifname}/wireless') | |
def in_subnet(ip_str, subnet_str=TEG_SUBNET_CIDR): | |
try: | |
return ipaddress.ip_address(ip_str) in ipaddress.ip_network(subnet_str) | |
except ValueError: | |
return False | |
def get_wifi_ssid(ifname) -> Optional[str]: | |
try: | |
out = subprocess.check_output(["iw", "dev", ifname, "link"], stderr=subprocess.DEVNULL).decode() | |
for line in out.splitlines(): | |
if line.strip().startswith("SSID:"): | |
return line.strip().split("SSID:")[1].strip() | |
except subprocess.CalledProcessError: | |
pass | |
return "unknown" | |
matching: List[TEG_Wifi_Interface] = [] | |
# Keep track of seen SSIDs | |
seen_ssids = set() | |
with IPDB() as ipdb: | |
for iface in ipdb.interfaces.values(): | |
if not isinstance(iface.ifname, str): | |
continue | |
if iface.ifname == 'lo' or not is_wireless(iface.ifname): | |
continue | |
ssid = get_wifi_ssid(iface.ifname) | |
if not ssid or ssid not in KNOWN_SSIDS or ssid in seen_ssids: | |
continue | |
seen_ssids.add(ssid) | |
ssid_index = KNOWN_SSIDS.index(ssid) | |
virt_ip = SSID_IP_MAPPING_OVERRIDE[ssid] | |
table = BASE_TABLE_ID + ssid_index | |
for ip_tuple in iface.ipaddr: | |
ip_address = ip_tuple[0] # First element is the IP string | |
if not in_subnet(ip_address): | |
continue | |
interface = cast(TEG_Wifi_Interface, { | |
'iface': iface.ifname, | |
'table': table, | |
'ip': str(ip_address), | |
'virt_ip': f"{virt_ip}/32", | |
'ssid': ssid | |
}) | |
matching.append(interface) | |
break # Only match one IP per iface | |
return matching | |
def verify_routing_tables(interfaces: List[TEG_Wifi_Interface]): | |
# Add custom routing tables if they don't exist already | |
rt_tables_path = '/etc/iproute2/rt_tables' | |
try: | |
with open(rt_tables_path, 'r') as f: | |
rt_content = f.read() | |
rt_entries = [ | |
entry for idx, iface in enumerate(interfaces) | |
if (entry := f"{iface['table']} wlan{idx}table") not in rt_content | |
] | |
if not rt_entries: | |
print("Tables already exist. All is well.") | |
return | |
with open(rt_tables_path, 'a') as f: | |
lines = [f"{entry}\n" for entry in rt_entries] | |
f.writelines(lines) | |
print(f"Wrote tables to rt_tables: {lines}") | |
except Exception as e: | |
print(f"Warning: Unable to update routing tables file: {e}") | |
def cleanup_interfaces(interfaces: List[TEG_Wifi_Interface]): | |
def remove_nat_rule(chain_name, dst=None, src=None, to_dst=None, to_src=None): | |
table = iptc.Table(iptc.Table.NAT) | |
chain = iptc.Chain(table, chain_name) | |
table.refresh() | |
for rule in chain.rules: | |
if dst and rule.dst != dst: | |
continue | |
if src and rule.src != src: | |
continue | |
if rule.target.name == 'DNAT' and to_dst and rule.target.to_destination != to_dst: | |
continue | |
if rule.target.name == 'SNAT' and to_src and rule.target.to_source != to_src: | |
continue | |
chain.delete_rule(rule) | |
break # assumes only one matching rule needs to be removed | |
def clear_postrouting_snat(prefix="192.168.92."): | |
""" | |
Remove *all* SNAT rules in the POSTROUTING chain | |
whose to_source starts with the given prefix. | |
""" | |
table = iptc.Table(iptc.Table.NAT) | |
# Refresh in case rules have changed | |
table.refresh() | |
chain = iptc.Chain(table, "POSTROUTING") | |
# Iterate over a snapshot since we're deleting as we go | |
for rule in list(chain.rules): | |
target = rule.target | |
if target.name == 'SNAT' and getattr(target, 'to_source', '').startswith(prefix): | |
chain.delete_rule(rule) | |
with IPRoute() as iproute, IPDB(nl=iproute) as ipdb: | |
lo = ipdb.interfaces['lo'] | |
for interface in interfaces: | |
# Remove IP from loopback interface | |
try: | |
print(f"Cleanup: Removing virt ip {lo}\n{ipdb}") | |
lo.del_ip(interface['virt_ip']) | |
except Exception: | |
print(f"Cannot delete IP address: {interface['virt_ip']}") # IP might not be present | |
ipdb.commit() | |
for interface in interfaces: | |
# Lookup interface index | |
print("Cleanup: Link lookup") | |
link = iproute.link_lookup(ifname=interface['iface']) | |
interface_table = interface['table'] | |
interface_idx = None | |
if link: | |
interface_idx = link[0] | |
print(f"Interface index for {interface['iface']} is {interface_idx}") | |
else: | |
print("Cleanup: There no interface index found.") | |
continue | |
#iproute.flush_routes(table=interface['table'], family=socket.AF_INET) | |
route_kwargs = {} | |
try: | |
routes = iproute.get_routes(table=interface_table) | |
for route in routes: | |
dst = route.get('dst', 'default') | |
if dst in [TEG_SUBNET_BASE, TEG_SUBNET_CIDR, 'default']: | |
# Extract only valid deletion fields | |
route_kwargs = { | |
'family': route['family'], | |
'dst_len': route['dst_len'], | |
'table': route['table'], | |
'proto': route['proto'], | |
'scope': route['scope'], | |
'type': route['type'], | |
} | |
attr_map = { | |
'RTA_DST': 'dst', | |
'RTA_GATEWAY': 'gateway', | |
'RTA_OIF': 'oif', | |
} | |
route_kwargs.update({ | |
attr_map[attr_name]: attr_value | |
for attr_name, attr_value in route['attrs'] | |
if attr_name in attr_map | |
}) | |
if dst == 'default': | |
route_kwargs.pop('dst', None) # default route has no 'dst' | |
iproute.route('delete', **route_kwargs) | |
print(f"\tDeleted route: {route_kwargs}") | |
except Exception as e: | |
print(f"Cleanup: Error deleting route: {route_kwargs}, Error: {e}") | |
# Delete IP rule | |
virt_ip = str(interface['virt_ip']) | |
virt_ip_nocidr = virt_ip.split("/")[0] | |
try: | |
print(f"Cleanup: Del rule virt_ip {interface_idx}") | |
# Delete *all* rules for this virtual IP, not just one table | |
existing_rules = iproute.get_rules() or [] | |
for rule in existing_rules: | |
attrs = dict(rule.get("attrs", [])) | |
rule_src = attrs.get("FRA_SRC") | |
rule_table = attrs.get("FRA_TABLE") | |
rule_prio = attrs.get("FRA_PRIORITY") | |
if rule_src != virt_ip_nocidr: | |
continue | |
print(f"\tDeleting rule: from {rule_src} table {rule_table} prio {rule_prio}") | |
try: | |
iproute.rule("delete", src=rule_src, table=rule_table, priority=rule_prio) | |
except Exception as e: | |
print(f"\tFailed to delete rule {rule_src} table {rule_table}: {e}") | |
except Exception: | |
pass | |
# Remove iptables rules | |
print(f"Cleanup: Remove OUTPUT rule") | |
remove_nat_rule('OUTPUT', dst=virt_ip, to_dst=DESINATION_GATEWAY) | |
print(f"Cleanup: Remove POSTROUTING rule") | |
clear_postrouting_snat() | |
remove_nat_rule('POSTROUTING', src=virt_ip, dst=DESINATION_GATEWAY, to_src=interface['ip']) | |
def setup_interfaces(interfaces: List[TEG_Wifi_Interface]): | |
# Helper to add iptables rule | |
def add_nat_rule(chain, dst=None, src=None, to_dst=None, to_src=None): | |
rule = iptc.Rule() | |
if dst: | |
rule.dst = dst | |
if src: | |
rule.src = src | |
target = iptc.Target(rule, 'DNAT' if to_dst else 'SNAT') | |
if to_dst: | |
target.to_destination = str(to_dst) | |
if to_src: | |
target.to_source = str(to_src) | |
rule.target = target | |
table = iptc.Table(iptc.Table.NAT) | |
chain = iptc.Chain(table, chain) | |
chain.insert_rule(rule) | |
# IPRoute/pyroute2 setup | |
with IPRoute() as iproute, IPDB(nl=iproute) as ipdb: | |
lo = ipdb.interfaces['lo'] | |
existing_ips = [ip[0] for ip in lo.ipaddr] | |
for interface in interfaces: | |
# Ex: sudo ip addr add 192.168.92.100/32 dev lo | |
virt_ip = interface['virt_ip'] | |
if virt_ip in existing_ips: | |
continue | |
print(f"Adding address: {virt_ip}") | |
lo.add_ip(virt_ip) | |
ipdb.commit() | |
for interface in interfaces: | |
# Lookup interface index | |
print("Setup: Link lookup") | |
virt_ip = interface['virt_ip'] | |
interface_table = interface['table'] | |
link = iproute.link_lookup(ifname=interface['iface']) | |
interface_idx = None | |
if link: | |
interface_idx = link[0] | |
else: | |
print(f"Setup: There no interface index found for {interface['iface']}") | |
continue | |
# Ex: sudo ip route add 192.168.91.0/24 dev wlan1 table wlan1table | |
try: | |
iproute.route('add', dst=TEG_SUBNET_CIDR, oif=interface_idx, table=interface_table) | |
except Exception as e: | |
print(f"Error adding route: dst={TEG_SUBNET_CIDR}, oif={interface_idx}, table={interface_table}, Error: {e}") | |
# Ex: sudo ip route add default via 192.168.91.1 dev wlan1 table wlan1table | |
iproute.route('add', dst='default', gateway=DESINATION_GATEWAY, oif=interface_idx, table=interface_table) | |
# Ex: sudo ip rule add from 192.168.92.100 table wlan1table | |
iproute.rule('add', src=virt_ip, table=interface_table) | |
# Ex: sudo iptables -t nat -A OUTPUT -d 192.168.92.100 -j DNAT --to-destination 192.168.91.1 | |
add_nat_rule('OUTPUT', dst=virt_ip, to_dst=DESINATION_GATEWAY) | |
# Ex: sudo iptables -t nat -A POSTROUTING -s 192.168.92.100 -d 192.168.91.1 -j SNAT --to-source 192.168.91.124 | |
add_nat_rule('POSTROUTING', src=virt_ip, dst=DESINATION_GATEWAY, to_src=interface['ip']) | |
if __name__ == "__main__": | |
check_root() | |
interfaces = find_matching_wireless_interfaces() | |
print("Cleaning up old routes") | |
cleanup_interfaces(interfaces=interfaces) | |
print("Setting up new routes") | |
verify_routing_tables(interfaces=interfaces) | |
setup_interfaces(interfaces=interfaces) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment