Skip to content

Instantly share code, notes, and snippets.

@caspark
Last active December 6, 2024 03:30
Show Gist options
  • Save caspark/f8f734a567c422225ac9895c1d698818 to your computer and use it in GitHub Desktop.
Save caspark/f8f734a567c422225ac9895c1d698818 to your computer and use it in GitHub Desktop.
Script to use network namespaces to create isolated network adapters on linux
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <[email protected]>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.
http://www.wtfpl.net/
#!/usr/bin/env python3
# A script to set up 1-8 'test networks' as virtual ethernet
# pairs, and set up various degraded network profiles for
# them.
# Useful for testing robustness of networked apps.
# Needs https://github.com/junegunn/fzf
# Tested on Fedora 40. May also need other things on other
# distros, YMMV.
#
#
# ip tables backup and restore:
# sudo iptables-save > iptables-backup.rules
# sudo iptables-restore < iptables-backup.rules
#
# show bridge state
# brctl show testnet-bridge
#
# enable IP forwarding if needed
# echo 1 > /proc/sys/net/ipv4/ip_forward
import argparse
import subprocess
import sys
import time
from typing import List
from testnets_lib import *
BRIDGE_NAME = "testnet-bridge"
MAX_TESTNETS = 8
def trace_log(*args, **kwargs):
if False:
print("Trace:", *args, **kwargs)
def debug_log(*args, **kwargs):
if False:
print("Debug:", *args, **kwargs)
def wait_for(cond_fn, failure_msg: str):
SLEEP_TIME = 0.1
time_waited = 0.0
while time_waited < 1.0:
if cond_fn():
return
time.sleep(SLEEP_TIME)
time_waited += SLEEP_TIME
print(failure_msg)
sys.exit(1)
def testnet_name(i: int) -> str:
"""Return the name of the testnet namespace for the given index."""
return f"testnet{i}"
def veth_name(i: int) -> str:
"""Return the name of the veth pair for the given index."""
return f"testnet{i}-veth"
def uplink_name(i: int) -> str:
"""Return the name of the uplink interface for the given index."""
return f"testnet{i}-uplink"
def netns_exists(name: str) -> bool:
"""Check if the given network namespace exists."""
namespaces = runc(["ip", "netns", "list"])
for line in namespaces.splitlines():
if line.split()[0].strip() == name:
return True
return False
def list_links() -> list[str]:
"""List all network links."""
r = []
links = runc(["ip", "link", "list"])
for i, line in enumerate(links.splitlines()):
if i % 2 == 1:
# every odd line is more details about the previous link so skip it
continue
# Each network link is listed with a number and colon (e.g., "1: lo: ..."), so check the format
link_name = line.split(":")[1].strip()
if "@" in link_name:
link_name = link_name.split("@")[0]
r.append(link_name)
return r
def link_exists(name: str) -> bool:
"""Check if the given network link exists."""
return name in list_links()
def discover_default_interface() -> str:
"""Discover the active ethernet interface."""
eth_iface = subprocess.run(
["ip", "route", "show", "default"], capture_output=True, text=True
)
interface_name = eth_iface.stdout.split()[4]
debug_log(f"Discovered interface {interface_name} as default interface")
return interface_name
def run(command: List[str]):
"""Utility function to execute a system command."""
try:
trace_log(f"Running command: {' '.join(command)}")
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Error executing command: {' '.join(command)}\n{e}")
sys.exit(1)
def runc(command: List[str]) -> str:
"""Utility function to execute a system command."""
try:
trace_log(f"Running command with output capture: {' '.join(command)}")
ran = subprocess.run(command, check=True, capture_output=True, text=True)
return ran.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Error executing captured command: {' '.join(command)}\n{e}")
sys.exit(1)
def setup_networks(n: int):
"""Setup N test networks."""
interface_name = discover_default_interface()
print(f"Setting up {n} test networks linked to {interface_name}")
if n > MAX_TESTNETS:
print(f"Cannot setup more than {MAX_TESTNETS} test networks")
sys.exit(1)
BRIDGE_IP = f"10.0.0.1"
# Create bridge in default namespace
if link_exists(BRIDGE_NAME):
debug_log(f"Bridge {BRIDGE_NAME} already exists")
else:
print(f"Creating bridge {BRIDGE_NAME}")
run(["ip", "link", "add", "name", BRIDGE_NAME, "type", "bridge"])
wait_for(lambda: link_exists(BRIDGE_NAME), "bridge should now exist")
debug_log(f"Created bridge {BRIDGE_NAME}")
run(["ip", "addr", "add", f"{BRIDGE_IP}/24", "dev", BRIDGE_NAME])
debug_log(f"Added address {BRIDGE_IP}/24 to bridge {BRIDGE_NAME}")
run(["ip", "link", "set", "dev", BRIDGE_NAME, "up"])
debug_log(f"Bridge {BRIDGE_NAME} is now up")
for i in range(1, n + 1):
testnet = testnet_name(i)
# Create network namespace
if netns_exists(testnet):
debug_log(f"Network namespace {testnet} already exists")
else:
print(f"Creating network namespace {testnet}")
run(["ip", "netns", "add", testnet])
wait_for(
lambda: netns_exists(testnet), "network namespace should now exist"
)
debug_log(f"Created network namespace {testnet}")
# Create veth pair
if link_exists(veth_name(i)):
debug_log(f"Veth pair {veth_name(i)} already exists")
else:
print(f"Creating veth pair {veth_name(i)}")
run(
[
"ip",
"link",
"add",
veth_name(i),
"type",
"veth",
"peer",
"name",
uplink_name(i),
]
)
wait_for(lambda: link_exists(veth_name(i)), "veth pair should now exist")
debug_log(f"Created veth pair {veth_name(i)}")
wait_for(lambda: link_exists(uplink_name(i)), "uplink should now exist")
# Attach one end of veth to the namespace
debug_log(f"Attaching veth {veth_name(i)} to namespace {testnet}")
run(["ip", "link", "set", veth_name(i), "netns", testnet])
# Setup veth inside namespace
ip_addr = f"10.0.0.{10 + i}/24"
debug_log(
f"Setting address for veth {veth_name(i)} inside namespace {testnet} to {ip_addr}"
)
run(
[
"ip",
"netns",
"exec",
testnet,
"ip",
"addr",
"add",
ip_addr,
"dev",
veth_name(i),
]
)
debug_log("Setting veth {veth_name(i)} up")
run(
[
"ip",
"netns",
"exec",
testnet,
"ip",
"link",
"set",
veth_name(i),
"up",
]
)
# Attach uplink to bridge
debug_log(f"Attaching uplink {uplink_name(i)} to bridge {BRIDGE_NAME}")
run(["ip", "link", "set", uplink_name(i), "master", BRIDGE_NAME])
debug_log(f"Setting uplink {uplink_name(i)} up")
run(["ip", "link", "set", uplink_name(i), "up"])
debug_log(f"Setting veth {veth_name(i)} default route to {BRIDGE_IP}")
run(
[
"ip",
"netns",
"exec",
testnet,
"ip",
"route",
"add",
"default",
"via",
BRIDGE_IP,
]
)
# Setup NAT via iptables for masquerading
run(
[
"iptables",
"-t",
"nat",
"-A",
"POSTROUTING",
"-o",
interface_name,
"-j",
"MASQUERADE",
]
)
run(["iptables", "-A", "FORWARD", "-i", BRIDGE_NAME, "-j", "ACCEPT"])
run(
[
"iptables",
"-A",
"FORWARD",
"-o",
BRIDGE_NAME,
"-m",
"state",
"--state",
"RELATED,ESTABLISHED",
"-j",
"ACCEPT",
]
)
def teardown_networks():
"""Teardown N test networks."""
interface_name = discover_default_interface()
print("Tearing down all existing test networks")
for i in range(1, MAX_TESTNETS + 1):
testnet = testnet_name(i)
# Delete the network namespace
if netns_exists(testnet):
print(f"Deleting network namespace {testnet}")
run(["ip", "netns", "del", testnet])
wait_for(
lambda: not netns_exists(testnet),
"network namespace should no longer exist",
)
debug_log(f"Deleted network namespace {testnet}")
else:
debug_log(f"Network namespace {testnet} does not exist")
# Delete the veth pairs (deleting one end should delete the other)
if link_exists(veth_name(i)):
wait_for(
lambda: link_exists(uplink_name(i)), "veth pair should have uplink"
)
print(f"Deleting veth pair {veth_name(i)} (and uplink {uplink_name(i)})")
run(["ip", "link", "delete", veth_name(i)])
wait_for(
lambda: not link_exists(veth_name(i)), "veth should no longer exist"
)
debug_log(f"Deleted veth pair {veth_name(i)} (and uplink {uplink_name(i)})")
else:
debug_log(f"Veth pair {veth_name(i)} does not exist")
wait_for(lambda: not link_exists(uplink_name(i)), "uplink should no longer exist")
# Delete bridge
if link_exists(BRIDGE_NAME):
print(f"Deleting bridge {BRIDGE_NAME}")
run(["ip", "link", "delete", BRIDGE_NAME, "type", "bridge"])
assert not link_exists(BRIDGE_NAME), "bridge should no longer exist"
debug_log(f"Deleted bridge {BRIDGE_NAME}")
else:
debug_log(f"Bridge {BRIDGE_NAME} does not exist")
# Cleanup iptables rules
# Assumes these rules are specifically for this setup and safe to remove
run(
[
"iptables",
"-t",
"nat",
"-D",
"POSTROUTING",
"-o",
interface_name,
"-j",
"MASQUERADE",
]
)
run(["iptables", "-D", "FORWARD", "-i", BRIDGE_NAME, "-j", "ACCEPT"])
run(
[
"iptables",
"-D",
"FORWARD",
"-o",
BRIDGE_NAME,
"-m",
"state",
"--state",
"RELATED,ESTABLISHED",
"-j",
"ACCEPT",
]
)
def select_with_fzf(options, multi=False, prompt="Select an option: "):
"""
Presents a list of options to the user using fzf and returns the selected option.
Args:
options (list): A list of strings representing the options.
Returns:
str: The selected option, or None if no option was selected.
"""
# Convert the list of options to a newline-separated string
options_string = "\n".join(str(o) for o in options if o)
cmd = ["fzf", "--height=~50%"]
cmd.extend(["--prompt", prompt])
if multi:
cmd.append("--multi")
# Run the fzf command as a subprocess
result = subprocess.run(
cmd,
input=options_string,
text=True,
stdout=subprocess.PIPE,
)
# Get the selected option
selected = result.stdout.strip()
if multi:
return selected.splitlines() if selected else []
else:
return selected if selected else None
# https://github.com/tylertreat/comcast#network-condition-profiles
# caution: any packets selected to be reordered will be sent without a delay
TC_PROFILES = {
"dialup": TCNetemProfile(
delay=Delay(time=185, jitter=20, correlation=20, distribution="paretonormal"),
reorder=Reorder(percent=2, correlation=25),
loss=RandomLoss(percent=2, correlation=25),
),
"dsl-poor": TCNetemProfile(
delay=Delay(time=70, jitter=10, correlation=25, distribution="paretonormal"),
reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=2, correlation=25),
rate=Rate(rate=2000),
),
"dsl-good": TCNetemProfile(
delay=Delay(time=40, jitter=5, correlation=25, distribution="paretonormal"),
reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=0.5, correlation=25),
),
"half-aus": TCNetemProfile(
delay=Delay(time=35, jitter=10, correlation=25, distribution="paretonormal"),
# reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=0.5, correlation=25),
),
"half-pacific": TCNetemProfile(
delay=Delay(time=120, jitter=10, correlation=25, distribution="paretonormal"),
# reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=0.5, correlation=25),
),
"half-pacific-5%-loss": TCNetemProfile(
delay=Delay(time=120, jitter=10, correlation=25, distribution="paretonormal"),
# reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=5, correlation=25),
),
"half-world": TCNetemProfile(
delay=Delay(time=160, jitter=16, correlation=25, distribution="paretonormal"),
# reorder=Reorder(percent=5, correlation=0.5),
loss=RandomLoss(percent=0.5, correlation=25),
),
"corrupt-50": TCNetemProfile(
delay=Delay(time=30, jitter=10, correlation=25, distribution="paretonormal"),
corrupt=Corrupt(percent=50, correlation=25),
),
}
def find_testnet_qdiscs() -> list[str]:
qdicsc = runc(["tc", "qdisc", "list"])
qdisc_filtered = []
for line in qdicsc.splitlines():
if "testnet" in line:
qdisc_filtered.append(line)
return qdisc_filtered
def edit_network_quality():
testnet_links = [l for l in list_links() if "testnet" in l]
if not testnet_links:
print("No testnet links found to edit; exiting")
return
qdiscs = find_testnet_qdiscs()
if qdiscs:
print("Existing qdiscs on testnet links:")
for line in qdiscs:
print(f" {line}")
else:
print("No qdiscs on testnet links found.")
print(f"Selecting from {len(testnet_links)} testnet links...")
selected_links = select_with_fzf(
testnet_links, multi=True, prompt="Select links to edit:"
)
if not selected_links:
return
print("Will edit:", ", ".join(selected_links))
print("Selecting profile to apply...")
profile_options = ["none: clear any existing profile applied to selected link(s)"]
profile_options.extend(
[f"{n}: {p.to_netem_string()}" for n, p in TC_PROFILES.items()]
)
selected_profile = select_with_fzf(
profile_options,
prompt="Select profile:",
)
if selected_profile is None:
return
selected_profile_name = selected_profile.split(":")[0]
if selected_profile_name == "none":
selected_profile_netem = None
print(f"Will clear any existing profile applied to {', '.join(selected_links)}")
else:
selected_profile = TC_PROFILES[selected_profile_name]
selected_profile_netem = selected_profile.to_netem_string()
print(
f"Will now apply {selected_profile_name} profile to {', '.join(selected_links)}\nDetails: {selected_profile_netem}"
)
for link in selected_links:
# clear existing profile if any
for entry in qdiscs:
if link in entry:
if "qdisc netem " in entry:
# we have an existing netem profile we need to clear
run(["tc", "qdisc", "del", "dev", link, "root"])
# now set the chosen profile
if selected_profile_netem is not None:
cmd = [
"tc",
"qdisc",
"add",
"dev",
link,
"root",
"netem",
]
for word in selected_profile_netem.split(" "):
cmd.append(word)
run(cmd)
# all done with this link
print(f"✓ {link}")
qdiscs = find_testnet_qdiscs()
print("New qdiscs on testnet links:")
for line in qdiscs:
print(f" {line}")
def main():
parser = argparse.ArgumentParser(description="Setup or teardown test networks.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--up",
type=int,
help="Set up the specified number of test networks.",
)
group.add_argument(
"--down",
action="store_true",
help="Tear down all test networks.",
)
group.add_argument(
"--edit",
action="store_true",
help="Set up loss, delay, etc characteristics of the networks",
)
args = parser.parse_args()
if args.up is not None:
setup_networks(args.up)
elif args.down:
teardown_networks()
elif args.edit:
edit_network_quality()
if __name__ == "__main__":
main()
from typing import NamedTuple, Optional, Tuple, Union
# A 'typed' wrapper around the tc/netem textual dsl
# the syntax for netem rules is fairly simple but there's quite
# a few optional params, and I found it hard to parse at a
# glance visually.
# And no, I didn't type this all out by hand.
class Delay(NamedTuple):
time: int # in milliseconds
jitter: Optional[int] = None # optional jitter in milliseconds
correlation: Optional[float] = None # correlation percentage
distribution: Optional[str] = None # uniform, normal, pareto, or paretonormal
# not modeling the state or gemodel based loss approaches for now
class RandomLoss(NamedTuple):
percent: float
correlation: Optional[float] = None
ecn: Optional[bool] = False
class Corrupt(NamedTuple):
percent: float
correlation: Optional[float] = None # correlation percentage
class Duplicate(NamedTuple):
percent: float
correlation: Optional[float] = None # correlation percentage
class Reorder(NamedTuple):
percent: float
correlation: Optional[float] = None # correlation percentage
gap_distance: Optional[int] = None # gap distance
class Rate(NamedTuple):
rate: int # in kbps
packet_overhead: Optional[int] = None
cell_size: Optional[int] = None
cell_overhead: Optional[int] = None
class Slot(NamedTuple):
min_delay: int
max_delay: Optional[int] = None
distribution: Optional[str] = None
delay: Optional[int] = None # slot delay
jitter: Optional[int] = None
packets: Optional[int] = None
bytes: Optional[int] = None
# https://man.archlinux.org/man/core/iproute2/tc-netem.8.en#EXAMPLES
class TCNetemProfile(NamedTuple):
limit_packets: Optional[int] = None
delay: Optional[Delay] = None
loss: Optional[RandomLoss] = None
corrupt: Optional[Corrupt] = None
duplicate: Optional[Duplicate] = None
reorder: Optional[Reorder] = None
rate: Optional[Rate] = None
slot: Optional[Slot] = None
seed: Optional[int] = None
def to_netem_string(self) -> str:
parts = []
if self.limit_packets is not None:
parts.append(f"limit {self.limit_packets}")
if self.delay is not None:
delay_str = f"delay {self.delay.time}ms"
if self.delay.jitter is not None:
delay_str += f" {self.delay.jitter}ms"
if self.delay.correlation is not None:
delay_str += f" {self.delay.correlation}%"
if self.delay.distribution is not None:
delay_str += f" distribution {self.delay.distribution}"
parts.append(delay_str)
if self.loss is not None:
loss_str = f"loss random {self.loss.percent}%"
if self.loss.correlation is not None:
loss_str += f" {self.loss.correlation}%"
if self.loss.ecn:
loss_str += " ecn"
parts.append(loss_str)
if self.corrupt is not None:
corrupt_str = f"corrupt {self.corrupt.percent}%"
if self.corrupt.correlation is not None:
corrupt_str += f" {self.corrupt.correlation}%"
parts.append(corrupt_str)
if self.duplicate is not None:
duplicate_str = f"duplicate {self.duplicate.percent}%"
if self.duplicate.correlation is not None:
duplicate_str += f" {self.duplicate.correlation}%"
parts.append(duplicate_str)
if self.reorder is not None:
reorder_str = f"reorder {self.reorder.percent}%"
if self.reorder.correlation is not None:
reorder_str += f" {self.reorder.correlation}%"
if self.reorder.gap_distance is not None:
reorder_str = f"gap {self.reorder.gap_distance}"
parts.append(reorder_str)
if self.rate is not None:
rate_str = f"rate {self.rate.rate}kbit"
if self.rate.packet_overhead is not None:
rate_str += f" {self.rate.packet_overhead}"
if self.rate.cell_size is not None:
rate_str += f" {self.rate.cell_size}"
if self.rate.cell_overhead is not None:
rate_str += f" {self.rate.cell_overhead}"
parts.append(rate_str)
if self.slot is not None:
slot_str = f"slot {self.slot.min_delay}"
if self.slot.max_delay is not None:
slot_str += f" {self.slot.max_delay}"
if self.slot.distribution is not None:
slot_str += f" distribution {self.slot.distribution}"
if self.slot.delay is not None:
slot_str += f" delay {self.slot.delay}"
if self.slot.jitter is not None:
slot_str += f" jitter {self.slot.jitter}"
parts.append(slot_str)
if self.seed is not None:
parts.append(f"seed {self.seed}")
return " ".join(parts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment