Last active
December 25, 2020 03:57
-
-
Save SaveTheRbtz/287511efe881f921d1198bd08cc18f97 to your computer and use it in GitHub Desktop.
affinitization scripts, originally written by @behebot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# mypy: allow-untyped-defs | |
""" | |
This script is used for applying affinity settings for various hardware devices. | |
Script originally based on Intel's [set_irq_affinity.sh](https://gist.github.com/SaveTheRbtz/8875474). | |
Over the years it was updated with heuristics based on the shape of Dropbox infrastructure. | |
Currently, this script can manage IRQ affinities, RPS, XPS, and RXQS. For the description of | |
these scaling settings please use [kernel's scaling.txt](https://www.kernel.org/doc/Documentation/networking/scaling.txt). | |
""" | |
import argparse | |
import logging | |
import re | |
import sys | |
from argparse import Namespace | |
from multiprocessing import cpu_count as get_cpu_count | |
from lib import ( | |
affinitize, | |
get_network_devices, | |
get_vectors, | |
MODE_NETWORK, | |
MODE_STORAGE, | |
NETWORK_PATTERNS, | |
packet_steering, | |
PS_AUTO, | |
PS_OFF, | |
PS_ON, | |
STORAGE_PATTERNS, | |
) | |
PS_CHOICES = (PS_ON, PS_OFF, PS_AUTO) | |
FORMAT = "%(asctime)s\t%(levelname)s\t%(message)s" | |
logging.basicConfig(format=FORMAT) | |
LOG = logging.getLogger() | |
def parse_ranges(string): | |
parsed_range = set() | |
cpu_count = get_cpu_count() | |
for r in string.split(","): | |
m = re.match(r"^(\d+)$", r) | |
if m: | |
parsed_range.add(int(m.group(1))) | |
continue | |
m = re.match(r"^(\d+)-(\d+|max)$", r) | |
if m: | |
start = int(m.group(1)) | |
end = m.group(2) | |
if end == "max": | |
end = cpu_count - 1 | |
else: | |
end = int(end) | |
temp_range = list(range(start, end + 1)) | |
for i in temp_range: | |
parsed_range.add(i) | |
continue | |
raise argparse.ArgumentTypeError( | |
"'" + string + "' is not a list " | |
"of range of numbers. Expected forms like '0,1,2-5' or '3'. " | |
) | |
return parsed_range | |
def parse_args(): | |
# type: () -> Namespace | |
parser = argparse.ArgumentParser( | |
description="Affinitize all the stuff", epilog=__doc__ | |
) | |
mode_subparser = parser.add_subparsers( | |
dest="mode", help="Type of device to affinitize" | |
) | |
network_parser = mode_subparser.add_parser(MODE_NETWORK, help="affinitize NICs") | |
network_parser.add_argument( | |
"--rps", | |
choices=PS_CHOICES, | |
default=PS_AUTO, | |
help="enable Receive Packet Steering [default: %(default)s]", | |
) | |
network_parser.add_argument( | |
"--xps", | |
choices=PS_CHOICES, | |
default=PS_ON, | |
help="enable Transmit Packet Steering [default: %(default)s]", | |
) | |
# TODO(rbtz): Receive Flow Steering | |
# TODO(rbtz): NUMA affinity | |
mode_subparser.add_parser(MODE_STORAGE, help="affinitize RAIDs") | |
parser.add_argument( | |
"--dry-run", | |
help="perform a dry run. do not apply changes", | |
action="store_true", | |
default=False, | |
) | |
parser.add_argument( | |
"--check", | |
help="check whether any modifications will be applied;" | |
"exits with non-zero 1 if there are any pending canhges;" | |
"implies dry-run", | |
action="store_true", | |
default=False, | |
) | |
parser.add_argument("--debug", help="print debug information", action="store_true") | |
# TODO(rbtz): Add filters by device name (e.g. apply tunings only to `eth0` or `em*`) | |
parser.add_argument( | |
"--cpuset", | |
help="use following cpuset to spread interrupts", | |
type=parse_ranges, | |
default=range(get_cpu_count()), | |
) | |
return parser.parse_args() | |
def main(args): | |
# type: (Namespace) -> int | |
if args.debug: | |
LOG.setLevel(logging.DEBUG) | |
cpu_count = get_cpu_count() | |
if cpu_count == 1: | |
LOG.critical("No need to affinitize, there is only one CPU presented.") | |
return 0 | |
if len(args.cpuset) > cpu_count: | |
LOG.critical("More CPUs specified in --cpuset than actually present in box.") | |
return 1 | |
for i in args.cpuset: | |
if i > cpu_count - 1: | |
# We start from CPU #0 | |
LOG.critical( | |
"CPU #{} was specified in --cpuset but it doesn't exist.".format(i) | |
) | |
return 1 | |
if args.mode == MODE_STORAGE: | |
patterns = STORAGE_PATTERNS | |
elif args.mode == MODE_NETWORK: | |
patterns = NETWORK_PATTERNS | |
else: | |
raise AssertionError("Unknown mode: {}".format(args.mode)) | |
if args.check: | |
args.dry_run = True | |
LOG.debug( | |
"Preforming check. Will return non-zero if any pending changes are detected." | |
) | |
if args.dry_run: | |
LOG.debug("Performing dry run. No changes will be applied.") | |
LOG.debug( | |
"{} mode selected. Will consider vectors matching: {}".format( | |
args.mode, ", ".join(patterns) | |
) | |
) | |
is_some_failed = False | |
for pattern in patterns: | |
vectors = list(get_vectors(pattern)) | |
if not vectors: | |
LOG.debug("No vectors matching pattern '{}' found".format(pattern)) | |
continue | |
if len(vectors) > cpu_count: | |
LOG.warning("There are more interrupt vectors matched than CPUs presented") | |
LOG.debug( | |
"Found {} vectors matching '{}' pattern".format(len(vectors), pattern) | |
) | |
LOG.info("Starting affinitization of '{}' pattern".format(pattern)) | |
try: | |
affinitize(vectors, cpu_count, args.cpuset, args.dry_run, args.check) | |
except Exception: | |
is_some_failed = True | |
LOG.exception("Failed to affinitize") | |
if args.mode == MODE_NETWORK: | |
try: | |
devices = list(get_network_devices()) | |
packet_steering( | |
devices, | |
cpu_count, | |
args.cpuset, | |
args.rps, | |
args.xps, | |
args.dry_run, | |
args.check, | |
) | |
except Exception: | |
LOG.exception("Failed to apply RPS/XPS rules") | |
is_some_failed = True | |
if is_some_failed: | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
parsed_args = parse_args() | |
sys.exit(main(parsed_args)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# mypy: allow-untyped-defs | |
""" | |
Library functions for the affinitize binary | |
""" | |
import itertools | |
import logging | |
import os.path | |
import re | |
from fnmatch import fnmatch | |
from glob import glob | |
from os import listdir | |
from typing import Any, Iterator, List, Optional, Text, Tuple, Union | |
NETWORK_PATTERNS = ["rx", "tx", "TxRx", r"eth\d+-\d+", "ioat-msix"] | |
STORAGE_PATTERNS = [ | |
"megasas", | |
"hpsa", | |
r"mpt2sas\d+-msix\d+", | |
r"mpt3sas\d+-msix\d+", | |
"aacraid", | |
r"nvme\d+q\d+", | |
] | |
MODE_NETWORK = "network" | |
MODE_STORAGE = "storage" | |
PS_ON, PS_OFF, PS_AUTO = "on", "off", "auto" | |
LOG = logging.getLogger() | |
def construct_affinity_vectors(cpuset, vectors): | |
return list(itertools.izip(itertools.cycle(cpuset), vectors)) | |
def affinitize(vectors, cpu_count, cpuset, dry_run=False, check=False): | |
""" | |
Applies affinitization based on upstream Intel's `set_irq_affinity.sh`: | |
https://tails.corp.dropbox.com/P6368 | |
""" | |
changes = False | |
affinity_vectors = construct_affinity_vectors(cpuset, vectors) | |
for cpu_id, vector in affinity_vectors: | |
affinity = construct_affinity(cpu_id, cpu_count) | |
smp_affinity_file = "/proc/irq/{}/smp_affinity".format(vector) | |
with open(smp_affinity_file, "r") as f: | |
current_affinity = parse_affinity(f.read()) | |
if current_affinity == affinity: | |
LOG.debug("{} is already set to {}".format(smp_affinity_file, affinity)) | |
continue | |
if not dry_run: | |
LOG.debug("Setting {} to {}".format(smp_affinity_file, affinity)) | |
with open(smp_affinity_file, "w") as f: | |
f.write(affinity) | |
else: | |
LOG.warning( | |
"{} will be changed {} -> {}".format( | |
smp_affinity_file, current_affinity, affinity | |
) | |
) | |
changes = True | |
if check and changes: | |
raise RuntimeError("Affinity changes are pending") | |
def packet_steering(devices, cpu_count, cpuset, rps, xps, dry_run=False, check=False): | |
""" | |
Applies RPS/XPS tunings based on suggestions from: | |
https://www.kernel.org/doc/Documentation/networking/scaling.txt | |
""" | |
failures = False | |
for device in devices: | |
if rps == PS_ON: | |
if cpu_count == len(cpuset): | |
rps_cpu_mask = (2 ** cpu_count) - 1 | |
else: | |
rps_cpu_mask = 0 | |
for cpu in cpuset: | |
rps_cpu_mask += 2 ** cpu | |
else: | |
rps_cpu_mask = 0 | |
rx_queues = get_device_queues(device, "rx") | |
rx_queues = rx_queues[: len(cpuset)] | |
for queue_id, queue in enumerate(rx_queues): | |
filename = os.path.join(queue, "rps_cpus") | |
try: | |
write_ps_file(filename, rps_cpu_mask, dry_run, check) | |
except Exception as e: | |
failures = True | |
LOG.critical("RPS failed: {}: {}: {}".format(device, filename, e)) | |
tx_queues = get_device_queues(device, "tx") | |
tx_queues = tx_queues[: len(cpuset)] | |
queue_vectors = construct_affinity_vectors(cpuset, tx_queues) | |
for queue_id, queue in queue_vectors: | |
if xps == PS_ON or (xps == PS_AUTO and len(tx_queues) != 1): | |
xps_cpu_mask = 2 ** (queue_id % cpu_count) | |
else: | |
xps_cpu_mask = 0 | |
filename = os.path.join(queue, "xps_cpus") | |
try: | |
write_ps_file(filename, xps_cpu_mask, dry_run, check) | |
except Exception as e: | |
failures = True | |
LOG.critical("XPS failed: {}: {}: {}".format(device, filename, e)) | |
# 4.18+ kernels have transmit queue selection based on receive queue | |
xps_rxqs_path = os.path.join(queue, "xps_rxqs") | |
if os.path.exists(xps_rxqs_path): | |
try: | |
write_ps_file(xps_rxqs_path, xps_cpu_mask, dry_run, check) | |
except Exception as e: | |
failures = True | |
LOG.critical( | |
"RXQS failed: {}: {}: {}".format(device, xps_rxqs_path, e) | |
) | |
if failures: | |
raise RuntimeError("Packet steering failed") | |
def write_ps_file(filename, cpu_mask, dry_run=False, check=False): | |
# type: (Union[Text, int], Any, bool, bool) -> None | |
""" | |
Converts `cpu_mask` to string that can be parsed by kernel and writes it to the given file if: | |
* dry-run is disabled; | |
* content of the file is different from value to-be-written. | |
""" | |
constructed_cpu_mask = construct_cpu_mask(cpu_mask) | |
with open(filename, "r") as f: | |
current_cpu_mask = parse_affinity(f.read()) | |
if current_cpu_mask == constructed_cpu_mask: | |
LOG.debug("{} is already set to {}".format(filename, current_cpu_mask)) | |
return | |
if dry_run: | |
LOG.debug( | |
"{} will be changed {} -> {}".format( | |
filename, current_cpu_mask, constructed_cpu_mask | |
) | |
) | |
if check: | |
raise RuntimeError("Packet Steering changes are pending") | |
return | |
LOG.debug("Setting {} to {}".format(filename, constructed_cpu_mask)) | |
with open(filename, "w") as f: | |
f.write(constructed_cpu_mask) | |
def get_network_devices(filters=("eth*",)): | |
# type: (Tuple[str]) -> List[str] | |
""" | |
Filter network devices based on globs | |
""" | |
devices = listdir("/sys/class/net") | |
return [d for d in devices for f in filters if fnmatch(d, f)] | |
def get_device_queues(device, ps_type): | |
# type: (Text, Text) -> List[Any] | |
assert ps_type in ("tx", "rx"), "unknown ps_type: {}".format(ps_type) | |
return glob("/sys/class/net/{}/queues/{}-*".format(device, ps_type)) | |
def construct_cpu_mask(value): | |
# type: (int) -> str | |
""" | |
Returns CPU mask consumable by Linux kernel | |
""" | |
def split_by_n(seq, n): | |
"""Divides sequence into chunks of n units""" | |
while seq: | |
yield seq[-n:] | |
seq = seq[:-n] | |
affinity_hex = format(value, "x") | |
affinity_int32_split = list(split_by_n(affinity_hex, 8)) | |
return ",".join(reversed(affinity_int32_split)) | |
def construct_affinity(cpu_id, cpu_count): | |
# type: (int, int) -> str | |
""" | |
Constructs string that can be echoded to /proc/irq/{}/smp_affinity | |
We need to split hex string into 32bit chunks | |
""" | |
return construct_cpu_mask(2 ** (cpu_id % cpu_count)) | |
def parse_affinity(affinity): | |
# type: (str) -> str | |
""" | |
Convert irq affinity format to hex, strip leading 0s | |
""" | |
return affinity.replace(",", "").lstrip("0").strip() or "0" | |
def parse_vector(pattern, line): | |
# type: (str, Text) -> Optional[Text] | |
""" | |
Returns irq string that consists of vector if line matches regexp, otherwise returns None | |
""" | |
if re.search(pattern, line): | |
vector = line.split(":", 1)[0] | |
return vector.strip(" ") | |
return None | |
def get_vectors(pattern): | |
# type: (str) -> Iterator[Text] | |
""" | |
TODO(rbtz): in new kernels that can be obtained via: | |
/sys/class/$CLASS/$IFACE/device/msi_irqs | |
""" | |
with open("/proc/interrupts") as lines: | |
for line in lines: | |
vector = parse_vector(pattern, line) | |
if vector: | |
yield vector |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment