Last active
December 6, 2024 03:30
-
-
Save caspark/f8f734a567c422225ac9895c1d698818 to your computer and use it in GitHub Desktop.
Script to use network namespaces to create isolated network adapters on linux
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE | |
Version 2, December 2004 | |
Copyright (C) 2004 Sam Hocevar <[email protected]> | |
Everyone is permitted to copy and distribute verbatim or modified | |
copies of this license document, and changing it is allowed as long | |
as the name is changed. | |
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE | |
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION | |
0. You just DO WHAT THE FUCK YOU WANT TO. | |
http://www.wtfpl.net/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# A script to set up 1-8 'test networks' as virtual ethernet | |
# pairs, and set up various degraded network profiles for | |
# them. | |
# Useful for testing robustness of networked apps. | |
# Needs https://github.com/junegunn/fzf | |
# Tested on Fedora 40. May also need other things on other | |
# distros, YMMV. | |
# | |
# | |
# ip tables backup and restore: | |
# sudo iptables-save > iptables-backup.rules | |
# sudo iptables-restore < iptables-backup.rules | |
# | |
# show bridge state | |
# brctl show testnet-bridge | |
# | |
# enable IP forwarding if needed | |
# echo 1 > /proc/sys/net/ipv4/ip_forward | |
import argparse | |
import subprocess | |
import sys | |
import time | |
from typing import List | |
from testnets_lib import * | |
BRIDGE_NAME = "testnet-bridge" | |
MAX_TESTNETS = 8 | |
def trace_log(*args, **kwargs): | |
if False: | |
print("Trace:", *args, **kwargs) | |
def debug_log(*args, **kwargs): | |
if False: | |
print("Debug:", *args, **kwargs) | |
def wait_for(cond_fn, failure_msg: str): | |
SLEEP_TIME = 0.1 | |
time_waited = 0.0 | |
while time_waited < 1.0: | |
if cond_fn(): | |
return | |
time.sleep(SLEEP_TIME) | |
time_waited += SLEEP_TIME | |
print(failure_msg) | |
sys.exit(1) | |
def testnet_name(i: int) -> str: | |
"""Return the name of the testnet namespace for the given index.""" | |
return f"testnet{i}" | |
def veth_name(i: int) -> str: | |
"""Return the name of the veth pair for the given index.""" | |
return f"testnet{i}-veth" | |
def uplink_name(i: int) -> str: | |
"""Return the name of the uplink interface for the given index.""" | |
return f"testnet{i}-uplink" | |
def netns_exists(name: str) -> bool: | |
"""Check if the given network namespace exists.""" | |
namespaces = runc(["ip", "netns", "list"]) | |
for line in namespaces.splitlines(): | |
if line.split()[0].strip() == name: | |
return True | |
return False | |
def list_links() -> list[str]: | |
"""List all network links.""" | |
r = [] | |
links = runc(["ip", "link", "list"]) | |
for i, line in enumerate(links.splitlines()): | |
if i % 2 == 1: | |
# every odd line is more details about the previous link so skip it | |
continue | |
# Each network link is listed with a number and colon (e.g., "1: lo: ..."), so check the format | |
link_name = line.split(":")[1].strip() | |
if "@" in link_name: | |
link_name = link_name.split("@")[0] | |
r.append(link_name) | |
return r | |
def link_exists(name: str) -> bool: | |
"""Check if the given network link exists.""" | |
return name in list_links() | |
def discover_default_interface() -> str: | |
"""Discover the active ethernet interface.""" | |
eth_iface = subprocess.run( | |
["ip", "route", "show", "default"], capture_output=True, text=True | |
) | |
interface_name = eth_iface.stdout.split()[4] | |
debug_log(f"Discovered interface {interface_name} as default interface") | |
return interface_name | |
def run(command: List[str]): | |
"""Utility function to execute a system command.""" | |
try: | |
trace_log(f"Running command: {' '.join(command)}") | |
subprocess.run(command, check=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error executing command: {' '.join(command)}\n{e}") | |
sys.exit(1) | |
def runc(command: List[str]) -> str: | |
"""Utility function to execute a system command.""" | |
try: | |
trace_log(f"Running command with output capture: {' '.join(command)}") | |
ran = subprocess.run(command, check=True, capture_output=True, text=True) | |
return ran.stdout.strip() | |
except subprocess.CalledProcessError as e: | |
print(f"Error executing captured command: {' '.join(command)}\n{e}") | |
sys.exit(1) | |
def setup_networks(n: int): | |
"""Setup N test networks.""" | |
interface_name = discover_default_interface() | |
print(f"Setting up {n} test networks linked to {interface_name}") | |
if n > MAX_TESTNETS: | |
print(f"Cannot setup more than {MAX_TESTNETS} test networks") | |
sys.exit(1) | |
BRIDGE_IP = f"10.0.0.1" | |
# Create bridge in default namespace | |
if link_exists(BRIDGE_NAME): | |
debug_log(f"Bridge {BRIDGE_NAME} already exists") | |
else: | |
print(f"Creating bridge {BRIDGE_NAME}") | |
run(["ip", "link", "add", "name", BRIDGE_NAME, "type", "bridge"]) | |
wait_for(lambda: link_exists(BRIDGE_NAME), "bridge should now exist") | |
debug_log(f"Created bridge {BRIDGE_NAME}") | |
run(["ip", "addr", "add", f"{BRIDGE_IP}/24", "dev", BRIDGE_NAME]) | |
debug_log(f"Added address {BRIDGE_IP}/24 to bridge {BRIDGE_NAME}") | |
run(["ip", "link", "set", "dev", BRIDGE_NAME, "up"]) | |
debug_log(f"Bridge {BRIDGE_NAME} is now up") | |
for i in range(1, n + 1): | |
testnet = testnet_name(i) | |
# Create network namespace | |
if netns_exists(testnet): | |
debug_log(f"Network namespace {testnet} already exists") | |
else: | |
print(f"Creating network namespace {testnet}") | |
run(["ip", "netns", "add", testnet]) | |
wait_for( | |
lambda: netns_exists(testnet), "network namespace should now exist" | |
) | |
debug_log(f"Created network namespace {testnet}") | |
# Create veth pair | |
if link_exists(veth_name(i)): | |
debug_log(f"Veth pair {veth_name(i)} already exists") | |
else: | |
print(f"Creating veth pair {veth_name(i)}") | |
run( | |
[ | |
"ip", | |
"link", | |
"add", | |
veth_name(i), | |
"type", | |
"veth", | |
"peer", | |
"name", | |
uplink_name(i), | |
] | |
) | |
wait_for(lambda: link_exists(veth_name(i)), "veth pair should now exist") | |
debug_log(f"Created veth pair {veth_name(i)}") | |
wait_for(lambda: link_exists(uplink_name(i)), "uplink should now exist") | |
# Attach one end of veth to the namespace | |
debug_log(f"Attaching veth {veth_name(i)} to namespace {testnet}") | |
run(["ip", "link", "set", veth_name(i), "netns", testnet]) | |
# Setup veth inside namespace | |
ip_addr = f"10.0.0.{10 + i}/24" | |
debug_log( | |
f"Setting address for veth {veth_name(i)} inside namespace {testnet} to {ip_addr}" | |
) | |
run( | |
[ | |
"ip", | |
"netns", | |
"exec", | |
testnet, | |
"ip", | |
"addr", | |
"add", | |
ip_addr, | |
"dev", | |
veth_name(i), | |
] | |
) | |
debug_log("Setting veth {veth_name(i)} up") | |
run( | |
[ | |
"ip", | |
"netns", | |
"exec", | |
testnet, | |
"ip", | |
"link", | |
"set", | |
veth_name(i), | |
"up", | |
] | |
) | |
# Attach uplink to bridge | |
debug_log(f"Attaching uplink {uplink_name(i)} to bridge {BRIDGE_NAME}") | |
run(["ip", "link", "set", uplink_name(i), "master", BRIDGE_NAME]) | |
debug_log(f"Setting uplink {uplink_name(i)} up") | |
run(["ip", "link", "set", uplink_name(i), "up"]) | |
debug_log(f"Setting veth {veth_name(i)} default route to {BRIDGE_IP}") | |
run( | |
[ | |
"ip", | |
"netns", | |
"exec", | |
testnet, | |
"ip", | |
"route", | |
"add", | |
"default", | |
"via", | |
BRIDGE_IP, | |
] | |
) | |
# Setup NAT via iptables for masquerading | |
run( | |
[ | |
"iptables", | |
"-t", | |
"nat", | |
"-A", | |
"POSTROUTING", | |
"-o", | |
interface_name, | |
"-j", | |
"MASQUERADE", | |
] | |
) | |
run(["iptables", "-A", "FORWARD", "-i", BRIDGE_NAME, "-j", "ACCEPT"]) | |
run( | |
[ | |
"iptables", | |
"-A", | |
"FORWARD", | |
"-o", | |
BRIDGE_NAME, | |
"-m", | |
"state", | |
"--state", | |
"RELATED,ESTABLISHED", | |
"-j", | |
"ACCEPT", | |
] | |
) | |
def teardown_networks(): | |
"""Teardown N test networks.""" | |
interface_name = discover_default_interface() | |
print("Tearing down all existing test networks") | |
for i in range(1, MAX_TESTNETS + 1): | |
testnet = testnet_name(i) | |
# Delete the network namespace | |
if netns_exists(testnet): | |
print(f"Deleting network namespace {testnet}") | |
run(["ip", "netns", "del", testnet]) | |
wait_for( | |
lambda: not netns_exists(testnet), | |
"network namespace should no longer exist", | |
) | |
debug_log(f"Deleted network namespace {testnet}") | |
else: | |
debug_log(f"Network namespace {testnet} does not exist") | |
# Delete the veth pairs (deleting one end should delete the other) | |
if link_exists(veth_name(i)): | |
wait_for( | |
lambda: link_exists(uplink_name(i)), "veth pair should have uplink" | |
) | |
print(f"Deleting veth pair {veth_name(i)} (and uplink {uplink_name(i)})") | |
run(["ip", "link", "delete", veth_name(i)]) | |
wait_for( | |
lambda: not link_exists(veth_name(i)), "veth should no longer exist" | |
) | |
debug_log(f"Deleted veth pair {veth_name(i)} (and uplink {uplink_name(i)})") | |
else: | |
debug_log(f"Veth pair {veth_name(i)} does not exist") | |
wait_for(lambda: not link_exists(uplink_name(i)), "uplink should no longer exist") | |
# Delete bridge | |
if link_exists(BRIDGE_NAME): | |
print(f"Deleting bridge {BRIDGE_NAME}") | |
run(["ip", "link", "delete", BRIDGE_NAME, "type", "bridge"]) | |
assert not link_exists(BRIDGE_NAME), "bridge should no longer exist" | |
debug_log(f"Deleted bridge {BRIDGE_NAME}") | |
else: | |
debug_log(f"Bridge {BRIDGE_NAME} does not exist") | |
# Cleanup iptables rules | |
# Assumes these rules are specifically for this setup and safe to remove | |
run( | |
[ | |
"iptables", | |
"-t", | |
"nat", | |
"-D", | |
"POSTROUTING", | |
"-o", | |
interface_name, | |
"-j", | |
"MASQUERADE", | |
] | |
) | |
run(["iptables", "-D", "FORWARD", "-i", BRIDGE_NAME, "-j", "ACCEPT"]) | |
run( | |
[ | |
"iptables", | |
"-D", | |
"FORWARD", | |
"-o", | |
BRIDGE_NAME, | |
"-m", | |
"state", | |
"--state", | |
"RELATED,ESTABLISHED", | |
"-j", | |
"ACCEPT", | |
] | |
) | |
def select_with_fzf(options, multi=False, prompt="Select an option: "): | |
""" | |
Presents a list of options to the user using fzf and returns the selected option. | |
Args: | |
options (list): A list of strings representing the options. | |
Returns: | |
str: The selected option, or None if no option was selected. | |
""" | |
# Convert the list of options to a newline-separated string | |
options_string = "\n".join(str(o) for o in options if o) | |
cmd = ["fzf", "--height=~50%"] | |
cmd.extend(["--prompt", prompt]) | |
if multi: | |
cmd.append("--multi") | |
# Run the fzf command as a subprocess | |
result = subprocess.run( | |
cmd, | |
input=options_string, | |
text=True, | |
stdout=subprocess.PIPE, | |
) | |
# Get the selected option | |
selected = result.stdout.strip() | |
if multi: | |
return selected.splitlines() if selected else [] | |
else: | |
return selected if selected else None | |
# https://github.com/tylertreat/comcast#network-condition-profiles | |
# caution: any packets selected to be reordered will be sent without a delay | |
TC_PROFILES = { | |
"dialup": TCNetemProfile( | |
delay=Delay(time=185, jitter=20, correlation=20, distribution="paretonormal"), | |
reorder=Reorder(percent=2, correlation=25), | |
loss=RandomLoss(percent=2, correlation=25), | |
), | |
"dsl-poor": TCNetemProfile( | |
delay=Delay(time=70, jitter=10, correlation=25, distribution="paretonormal"), | |
reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=2, correlation=25), | |
rate=Rate(rate=2000), | |
), | |
"dsl-good": TCNetemProfile( | |
delay=Delay(time=40, jitter=5, correlation=25, distribution="paretonormal"), | |
reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=0.5, correlation=25), | |
), | |
"half-aus": TCNetemProfile( | |
delay=Delay(time=35, jitter=10, correlation=25, distribution="paretonormal"), | |
# reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=0.5, correlation=25), | |
), | |
"half-pacific": TCNetemProfile( | |
delay=Delay(time=120, jitter=10, correlation=25, distribution="paretonormal"), | |
# reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=0.5, correlation=25), | |
), | |
"half-pacific-5%-loss": TCNetemProfile( | |
delay=Delay(time=120, jitter=10, correlation=25, distribution="paretonormal"), | |
# reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=5, correlation=25), | |
), | |
"half-world": TCNetemProfile( | |
delay=Delay(time=160, jitter=16, correlation=25, distribution="paretonormal"), | |
# reorder=Reorder(percent=5, correlation=0.5), | |
loss=RandomLoss(percent=0.5, correlation=25), | |
), | |
"corrupt-50": TCNetemProfile( | |
delay=Delay(time=30, jitter=10, correlation=25, distribution="paretonormal"), | |
corrupt=Corrupt(percent=50, correlation=25), | |
), | |
} | |
def find_testnet_qdiscs() -> list[str]: | |
qdicsc = runc(["tc", "qdisc", "list"]) | |
qdisc_filtered = [] | |
for line in qdicsc.splitlines(): | |
if "testnet" in line: | |
qdisc_filtered.append(line) | |
return qdisc_filtered | |
def edit_network_quality(): | |
testnet_links = [l for l in list_links() if "testnet" in l] | |
if not testnet_links: | |
print("No testnet links found to edit; exiting") | |
return | |
qdiscs = find_testnet_qdiscs() | |
if qdiscs: | |
print("Existing qdiscs on testnet links:") | |
for line in qdiscs: | |
print(f" {line}") | |
else: | |
print("No qdiscs on testnet links found.") | |
print(f"Selecting from {len(testnet_links)} testnet links...") | |
selected_links = select_with_fzf( | |
testnet_links, multi=True, prompt="Select links to edit:" | |
) | |
if not selected_links: | |
return | |
print("Will edit:", ", ".join(selected_links)) | |
print("Selecting profile to apply...") | |
profile_options = ["none: clear any existing profile applied to selected link(s)"] | |
profile_options.extend( | |
[f"{n}: {p.to_netem_string()}" for n, p in TC_PROFILES.items()] | |
) | |
selected_profile = select_with_fzf( | |
profile_options, | |
prompt="Select profile:", | |
) | |
if selected_profile is None: | |
return | |
selected_profile_name = selected_profile.split(":")[0] | |
if selected_profile_name == "none": | |
selected_profile_netem = None | |
print(f"Will clear any existing profile applied to {', '.join(selected_links)}") | |
else: | |
selected_profile = TC_PROFILES[selected_profile_name] | |
selected_profile_netem = selected_profile.to_netem_string() | |
print( | |
f"Will now apply {selected_profile_name} profile to {', '.join(selected_links)}\nDetails: {selected_profile_netem}" | |
) | |
for link in selected_links: | |
# clear existing profile if any | |
for entry in qdiscs: | |
if link in entry: | |
if "qdisc netem " in entry: | |
# we have an existing netem profile we need to clear | |
run(["tc", "qdisc", "del", "dev", link, "root"]) | |
# now set the chosen profile | |
if selected_profile_netem is not None: | |
cmd = [ | |
"tc", | |
"qdisc", | |
"add", | |
"dev", | |
link, | |
"root", | |
"netem", | |
] | |
for word in selected_profile_netem.split(" "): | |
cmd.append(word) | |
run(cmd) | |
# all done with this link | |
print(f"✓ {link}") | |
qdiscs = find_testnet_qdiscs() | |
print("New qdiscs on testnet links:") | |
for line in qdiscs: | |
print(f" {line}") | |
def main(): | |
parser = argparse.ArgumentParser(description="Setup or teardown test networks.") | |
group = parser.add_mutually_exclusive_group(required=True) | |
group.add_argument( | |
"--up", | |
type=int, | |
help="Set up the specified number of test networks.", | |
) | |
group.add_argument( | |
"--down", | |
action="store_true", | |
help="Tear down all test networks.", | |
) | |
group.add_argument( | |
"--edit", | |
action="store_true", | |
help="Set up loss, delay, etc characteristics of the networks", | |
) | |
args = parser.parse_args() | |
if args.up is not None: | |
setup_networks(args.up) | |
elif args.down: | |
teardown_networks() | |
elif args.edit: | |
edit_network_quality() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import NamedTuple, Optional, Tuple, Union | |
# A 'typed' wrapper around the tc/netem textual dsl | |
# the syntax for netem rules is fairly simple but there's quite | |
# a few optional params, and I found it hard to parse at a | |
# glance visually. | |
# And no, I didn't type this all out by hand. | |
class Delay(NamedTuple): | |
time: int # in milliseconds | |
jitter: Optional[int] = None # optional jitter in milliseconds | |
correlation: Optional[float] = None # correlation percentage | |
distribution: Optional[str] = None # uniform, normal, pareto, or paretonormal | |
# not modeling the state or gemodel based loss approaches for now | |
class RandomLoss(NamedTuple): | |
percent: float | |
correlation: Optional[float] = None | |
ecn: Optional[bool] = False | |
class Corrupt(NamedTuple): | |
percent: float | |
correlation: Optional[float] = None # correlation percentage | |
class Duplicate(NamedTuple): | |
percent: float | |
correlation: Optional[float] = None # correlation percentage | |
class Reorder(NamedTuple): | |
percent: float | |
correlation: Optional[float] = None # correlation percentage | |
gap_distance: Optional[int] = None # gap distance | |
class Rate(NamedTuple): | |
rate: int # in kbps | |
packet_overhead: Optional[int] = None | |
cell_size: Optional[int] = None | |
cell_overhead: Optional[int] = None | |
class Slot(NamedTuple): | |
min_delay: int | |
max_delay: Optional[int] = None | |
distribution: Optional[str] = None | |
delay: Optional[int] = None # slot delay | |
jitter: Optional[int] = None | |
packets: Optional[int] = None | |
bytes: Optional[int] = None | |
# https://man.archlinux.org/man/core/iproute2/tc-netem.8.en#EXAMPLES | |
class TCNetemProfile(NamedTuple): | |
limit_packets: Optional[int] = None | |
delay: Optional[Delay] = None | |
loss: Optional[RandomLoss] = None | |
corrupt: Optional[Corrupt] = None | |
duplicate: Optional[Duplicate] = None | |
reorder: Optional[Reorder] = None | |
rate: Optional[Rate] = None | |
slot: Optional[Slot] = None | |
seed: Optional[int] = None | |
def to_netem_string(self) -> str: | |
parts = [] | |
if self.limit_packets is not None: | |
parts.append(f"limit {self.limit_packets}") | |
if self.delay is not None: | |
delay_str = f"delay {self.delay.time}ms" | |
if self.delay.jitter is not None: | |
delay_str += f" {self.delay.jitter}ms" | |
if self.delay.correlation is not None: | |
delay_str += f" {self.delay.correlation}%" | |
if self.delay.distribution is not None: | |
delay_str += f" distribution {self.delay.distribution}" | |
parts.append(delay_str) | |
if self.loss is not None: | |
loss_str = f"loss random {self.loss.percent}%" | |
if self.loss.correlation is not None: | |
loss_str += f" {self.loss.correlation}%" | |
if self.loss.ecn: | |
loss_str += " ecn" | |
parts.append(loss_str) | |
if self.corrupt is not None: | |
corrupt_str = f"corrupt {self.corrupt.percent}%" | |
if self.corrupt.correlation is not None: | |
corrupt_str += f" {self.corrupt.correlation}%" | |
parts.append(corrupt_str) | |
if self.duplicate is not None: | |
duplicate_str = f"duplicate {self.duplicate.percent}%" | |
if self.duplicate.correlation is not None: | |
duplicate_str += f" {self.duplicate.correlation}%" | |
parts.append(duplicate_str) | |
if self.reorder is not None: | |
reorder_str = f"reorder {self.reorder.percent}%" | |
if self.reorder.correlation is not None: | |
reorder_str += f" {self.reorder.correlation}%" | |
if self.reorder.gap_distance is not None: | |
reorder_str = f"gap {self.reorder.gap_distance}" | |
parts.append(reorder_str) | |
if self.rate is not None: | |
rate_str = f"rate {self.rate.rate}kbit" | |
if self.rate.packet_overhead is not None: | |
rate_str += f" {self.rate.packet_overhead}" | |
if self.rate.cell_size is not None: | |
rate_str += f" {self.rate.cell_size}" | |
if self.rate.cell_overhead is not None: | |
rate_str += f" {self.rate.cell_overhead}" | |
parts.append(rate_str) | |
if self.slot is not None: | |
slot_str = f"slot {self.slot.min_delay}" | |
if self.slot.max_delay is not None: | |
slot_str += f" {self.slot.max_delay}" | |
if self.slot.distribution is not None: | |
slot_str += f" distribution {self.slot.distribution}" | |
if self.slot.delay is not None: | |
slot_str += f" delay {self.slot.delay}" | |
if self.slot.jitter is not None: | |
slot_str += f" jitter {self.slot.jitter}" | |
parts.append(slot_str) | |
if self.seed is not None: | |
parts.append(f"seed {self.seed}") | |
return " ".join(parts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment