Skip to content

Instantly share code, notes, and snippets.

@rus-kilian
Last active October 17, 2023 10:04
Show Gist options
  • Save rus-kilian/70bddae94713ca1671c5f6b8fa6860c6 to your computer and use it in GitHub Desktop.
Save rus-kilian/70bddae94713ca1671c5f6b8fa6860c6 to your computer and use it in GitHub Desktop.
WLAN client to prometheus exporter (inspired by WLAN-Pi)
#!/usr/bin/env python3
import time
import os
import sys
import subprocess # nosec: B404
import socket
import re
import http.client
import json
from urllib.request import build_opener, AbstractHTTPHandler
from urllib.error import URLError, HTTPError
import dns.name
import dns.query
import dns.dnssec
import dns.message
import dns.resolver
import dns.rdatatype
import netaddr
from threading import Thread
from prometheus_client import Gauge, Summary, Info, start_http_server
from prometheus_client.registry import REGISTRY
from prometheus_client.exposition import MetricsHandler
# Debian bullseye has _ThreadingSimpleServer renamed to ThreadingWSGIServer
try:
from prometheus_client.exposition import (
ThreadingWSGIServer as _ThreadingSimpleServer,
)
except ImportError:
from prometheus_client.exposition import _ThreadingSimpleServer
import argparse
import logging
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--listen-port", type=int, help="The port the exporter will listen on", default=9423
)
parser.add_argument(
"--delay",
type=int,
help="The refresh delay the exporter will wait between runs",
default=15,
)
parser.add_argument(
"--wlan-delay",
type=int,
help="The refresh delay the exporter will wait between WLAN runs",
default=10,
)
parser.add_argument(
"--ping-interval",
type=int,
help="The delay between two pings in milliseconds",
default=500,
)
parser.add_argument(
"--ping-count", type=int, help="The amount of pings sent to a target", default=10
)
parser.add_argument(
"--ping-size",
type=int,
help="The size of pings sent to a target in bytes",
default=56,
)
parser.add_argument(
"--ping-target-v4",
type=str,
help="The target of pings sent when IPv4 is available",
default="8.8.8.8",
)
parser.add_argument(
"--ping-target-v6",
type=str,
help="The target of pings sent when IPv6 is available",
default="2001:4860:4860::8888",
)
parser.add_argument(
"--http-target",
type=str,
help="The target of http requests sent",
default="youtube.com",
)
parser.add_argument(
"--https-target",
type=str,
help="The target of https requests sent",
default="youtu.be",
)
parser.add_argument(
"--https-ca",
type=str,
help="The https request CA file to use",
default="/etc/ssl/certs/ca-certificates.crt",
)
parser.add_argument(
"--dns-lookup",
type=str,
help="The DNS target to lookup if network is available (must be DNSSEC enabled!)",
default="www.ripe.net",
)
parser.add_argument(
"--iperf",
dest="run_iperf",
action="store_true",
help="Run iPerf tests",
default=False,
)
parser.add_argument(
"--iperf-delay",
type=int,
help="The refresh delay the exporter will wait between iperf runs",
default=300,
)
parser.add_argument(
"--iperf-target",
type=str,
help="The target of iperf3 MOS measurements",
default="must.bring.your.own",
)
parser.add_argument(
"--iperf-port", type=int, help="The iperf3 port of the iperf3 server ", default=5201
)
if sys.platform == "linux":
wlan_dev = "wlan0"
else:
wlan_dev = "en0"
parser.add_argument(
"--wlan-dev",
type=str,
help="The wireless LAN interface to measure",
default=wlan_dev,
)
parser.add_argument(
"--bssid-timeout",
type=int,
help="Timeout to count wireless LAN BSSIDs that have been visited",
default=300,
)
parser.add_argument(
"--debug",
"-d",
dest="debug",
action="store_true",
help="Run debug mode",
default=False,
)
args = parser.parse_args()
SYSTEMD_FIRST_SOCKET_FD = 3
CONTENT_TYPE_LATEST = str("text/plain; version=0.0.5; charset=utf-8")
"""Content type of the latest text format"""
logger = logging.getLogger(__name__)
debug = args.debug
# create console handler and set level to debug
ch = logging.StreamHandler()
if debug:
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter(
"%(asctime)s - %(name)s/%(threadName)s - %(levelname)s - %(message)s"
)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
bssids = {}
channels = {}
default_v4_match = re.compile(r"^default\s+\S+\s+([\d\.]+)\s+.*$")
default_v6_match = re.compile(r"^default\s+\S+\s+([0-9a-fA-F:]+)\s+.*$")
ping_match = re.compile(
r"^.* = (\d+)\/(\d+)\/(\d+)%.* = ([\d\.]+)/([\d\.]+)/([\d\.]+)$"
)
linux_iw_link = re.compile(r"^\s+([^:]*):\s+(.*)$")
linux_iw_link_signal = re.compile(r"^\s*(-\d+)\s*dBm")
linux_iw_link_bssid = re.compile(r"^Connected to\s+([a-fA-F0-9:]+)\s+.*$")
linux_iw_info_split = re.compile(r"^\s+channel\s+(\d+)\s+.*width:\s+(\d+)\s+MHz.*$")
linux_iw_info_txpwr = re.compile(r"^\s+txpower\s+([\d\.]+)\s+dBm$")
linux_iw_station_dump = re.compile(r"^\s*([^:]+)\s*:\s+(.+)$")
linux_iw_station_mcs_rate = re.compile(r"^.*\sMCS\s+(\d+)\s.*$")
linux_iw_station_beacon_signal = re.compile(r"^(-\s*\d+)\s*dBm\s*$")
linux_iw_connected_time = re.compile(r"^(\d+)\s*seconds")
linux_iw_scan_dump = re.compile(r"^[\s*]*([^:]+)\s*:\s+(.+)$")
linux_iw_chan_utilization = re.compile(r"^(\d+)\/(\d+)$")
linux_assoc_bssid = re.compile(r"^\s*([^:]+)\s*:\s*(.*)$")
linux_is_assoc = re.compile(r"^.*\s+--\s+associated$")
re_macos_wlan = re.compile(r"^\s*([^:]+)\s*:\s+(.+)$")
re_first_number = re.compile(r"^(\d+)\s+")
logger.debug("Defining metrics")
# Define Metrics
PING4_collect = Summary(
"ping4_collect", "IPv4 poller details collecting and processing ICMP and DNS stats"
)
IPv4_ping_avg = Gauge(
"ping_ipv4_avg", "ICMP IPv4 RTT average [ms]", labelnames=["target"]
)
IPv4_ping_min = Gauge("ping_ipv4_min", "ICMP IPv4 RTT min [ms]", labelnames=["target"])
IPv4_ping_max = Gauge("ping_ipv4_max", "ICMP IPv4 RTT max [ms]", labelnames=["target"])
IPv4_ping_loss = Gauge("ping_ipv4_loss", "ICMP IPv4 loss %", labelnames=["target"])
IPv4_DNS_UDP_delay = Gauge(
"dns_udp_ipv4_delay", "DNS RTT over IPv4 (UDP)", labelnames=["target"]
)
IPv4_DNS_TCP_delay = Gauge(
"dns_tcp_ipv4_delay", "DNS RTT over IPv4 (TCP)", labelnames=["target"]
)
IPv4_DNSSEC_delay = Gauge(
"dnssec_ipv4_delay", "DNSSEC RTT over IPv4", labelnames=["target"]
)
IPv4Up = Gauge("net_ipv4_up", "Network connection is up (IPv4)")
DNSSEC_IPv4 = Gauge(
"dnssec_ipv4_up", "Network recursor is DNSSEC capable (IPv4)", labelnames=["target"]
)
IPv4_http_delay = Gauge(
"http_ipv4_delay",
"HTTP load time to %s over IPv4" % args.http_target,
labelnames=["target"],
)
IPv4_http_status_code = Gauge(
"http_ipv4_status_code",
"HTTP status code to %s over IPv4" % args.http_target,
labelnames=["target"],
)
IPv4_https_delay = Gauge(
"https_ipv4_delay",
"HTTPS load time to %s over IPv4" % args.https_target,
labelnames=["target"],
)
IPv4_https_status_code = Gauge(
"https_ipv4_status_code",
"HTTPS status code to %s over IPv4" % args.https_target,
labelnames=["target"],
)
PING6_collect = Summary(
"ping6_collect", "IPv6 poller details collecting and processing ICMP and DNS stats"
)
IPv6_ping_avg = Gauge(
"ping_ipv6_avg", "ICMP IPv6 RTT average [ms]", labelnames=["target"]
)
IPv6_ping_min = Gauge("ping_ipv6_min", "ICMP IPv6 RTT min [ms]", labelnames=["target"])
IPv6_ping_max = Gauge("ping_ipv6_max", "ICMP IPv6 RTT max [ms]", labelnames=["target"])
IPv6_ping_loss = Gauge("ping_ipv6_loss", "ICMP IPv6 loss %", labelnames=["target"])
IPv6_DNS_UDP_delay = Gauge(
"dns_udp_ipv6_delay", "DNS RTT over IPv6 (UDP)", labelnames=["target"]
)
IPv6_DNS_TCP_delay = Gauge(
"dns_tcp_ipv6_delay", "DNS RTT over IPv6 (TCP)", labelnames=["target"]
)
IPv6_DNSSEC_delay = Gauge(
"dnssec_ipv6_delay", "DNSSEC RTT over IPv6", labelnames=["target"]
)
IPv6Up = Gauge("net_ipv6_up", "Network connection is up (IPv6)")
DNSSEC_IPv6 = Gauge(
"dnssec_ipv6_up", "Network recursor is DNSSEC capable (IPv6)", labelnames=["target"]
)
IPv6_http_delay = Gauge(
"http_ipv6_delay",
"HTTP load time to %s over IPv6" % args.http_target,
labelnames=["target"],
)
IPv6_http_status_code = Gauge(
"http_ipv6_status_code",
"HTTP status code to %s over IPv6" % args.http_target,
labelnames=["target"],
)
IPv6_https_delay = Gauge(
"https_ipv6_delay",
"HTTPS load time to %s over IPv6" % args.https_target,
labelnames=["target"],
)
IPv6_https_status_code = Gauge(
"https_ipv6_status_code",
"HTTPS status code to %s over IPv6" % args.https_target,
labelnames=["target"],
)
IPv4_iperf_mbps = Gauge(
"iperf3_ipv4_mbps",
"iPerf3 IPv4 MBps",
labelnames=["target", "protocol", "direction"],
)
IPv4_iperf_jitter = Gauge(
"iperf3_ipv4_jitter", "iPerf3 IPv4 jitter [ms]", labelnames=["target", "direction"]
)
IPv4_iperf_packets = Gauge(
"iperf3_ipv4_packets", "iPerf3 IPv4 packets", labelnames=["target", "direction"]
)
IPv4_iperf_lost_packets = Gauge(
"iperf3_ipv4_lost_packets",
"iPerf3 IPv4 lost packets",
labelnames=["target", "direction"],
)
IPv4_iperf_lost_percent = Gauge(
"iperf3_ipv4_lost_percent",
"iPerf3 IPv4 lost percent",
labelnames=["target", "direction"],
)
IPv4_iperf_mos_score = Gauge(
"iperf3_ipv4_mos_score",
"iPerf3 IPv4 calculated MOS score over UDP",
labelnames=["target", "direction"],
)
IPv6_iperf_mbps = Gauge(
"iperf3_ipv6_mbps",
"iPerf3 IPv6 MBps",
labelnames=["target", "protocol", "direction"],
)
IPv6_iperf_jitter = Gauge(
"iperf3_ipv6_jitter", "iPerf3 IPv6 jitter [ms]", labelnames=["target", "direction"]
)
IPv6_iperf_packets = Gauge(
"iperf3_ipv6_packets", "iPerf3 IPv6 packets", labelnames=["target", "direction"]
)
IPv6_iperf_lost_packets = Gauge(
"iperf3_ipv6_lost_packets",
"iPerf3 IPv6 lost packets",
labelnames=["target", "direction"],
)
IPv6_iperf_lost_percent = Gauge(
"iperf3_ipv6_lost_percent",
"iPerf3 IPv6 lost percent",
labelnames=["target", "direction"],
)
IPv6_iperf_mos_score = Gauge(
"iperf3_ipv6_mos_score",
"iPerf3 IPv6 calculated MOS score",
labelnames=["target", "direction"],
)
WLAN_collect = Summary(
"wlan_collect", "Poller details collecting and processing WLAN stats"
)
SSID = Info("ssid", "SSID of currently connected WLAN")
Chan = Gauge("channel", "Network wireless channel", labelnames=["ssid"])
Chan_width = Gauge(
"channel_width", "Network wireless channel width", labelnames=["ssid"]
)
RSSI = Gauge("rssi", "Network wireless RSSI", labelnames=["ssid"])
BSSID = Info("current_bssid", "Currently connected BSSID")
BSSIDS_visited = Gauge(
"bssids_visited", "BSSIDs visited within the last %d" % args.bssid_timeout
)
Band_changes = Gauge(
"band_changes", "Band changes within the last %d" % args.bssid_timeout
)
Channel_changes = Gauge(
"channel_changes", "Channel changes within the last %d" % args.bssid_timeout
)
if sys.platform == "linux":
MCS_rx = Gauge("mcs_rx", "Network wireless MCS in RX direction")
RX_rate = Gauge("rx_bitrate", "Network wireless bitrate in RX direction")
MCS_tx = Gauge("mcs_tx", "Network wireless MCS in TX direction")
TX_rate = Gauge("tx_bitrate", "Network wireless bitrate in TX direction")
TX_pwr = Gauge("tx_pwr", "Network wireless power level in TX direction")
RX_bytes = Gauge("rx_bytes", "Network wireless received bytes")
TX_bytes = Gauge("tx_bytes", "Network wireless transmitted bytes")
RX_pkts = Gauge("rx_pkts", "Network wireless received packets")
TX_pkts = Gauge("tx_pkts", "Network wireless transmitted packets")
TX_failed = Gauge("tx_failed", "Network wireless failed transmitted packets")
Beacon_loss = Gauge("beacon_loss", "Network wireless beacons lost in the air")
Beacon_rx = Gauge("beacon_rx", "Network wireless beacons RX level")
RX_drop_misc = Gauge("rx_drop_misc", "Network wireless misc dropped RX packets")
Beacon_signal_avg = Gauge(
"beacon_signal_avg", "Network wireless beacon average signal level"
)
Last_ack_signal = Gauge("last_ack_signal", "Network wireless last ACK signal level")
Authorized = Gauge("authorized", "Network wireless STA authorized to BSSID")
Authenticated = Gauge("authenticated", "Network wireless STA authenticated to WDS")
Associated = Gauge("associated", "Network wireless STA associated to AP")
Beacon_interval = Gauge("beacon_interval", "Network wireless beacon interval")
Connected_time = Gauge("connected_time", "Network wireless connection uptime")
Last_seen = Gauge(
"last_seen", "Wireless AP last seen scan result (in milliseconds)"
)
Station_count = Gauge(
"station_count", "Wireless AP connected/associated station count"
)
Channel_utilization = Gauge("channel_utilization", "Wireless channel utilization")
Available_capacity = Gauge("available_capacity", "Available admission capacity")
else:
MCS = Gauge("mcs", "Network wireless MCS", labelnames=["ssid"])
logger.debug("Finished defining metrics")
def ping(host, interval, count, size, source):
if sys.platform == "linux":
filepath = "/usr/bin/fping"
else:
filepath = "/usr/local/bin/fping"
# let's give some time for the ping to complete
_delay = interval
if _delay < 1:
_delay = 1
# allow 1s timeout and give 10s offset
timeout = int(count * _delay / 1000 * 2 + 10)
logger.debug("Setting command execution timeout to: %d" % timeout)
ping_command = [
"/usr/bin/timeout",
str(timeout),
filepath,
"-b",
str(size),
"-i",
"1",
"-p",
str(interval),
"-q",
"-c",
str(count),
]
# Using source address?
if source:
ping_command += ["-s", source]
output = {}
ping_command += [host]
# Execute the ping
logger.debug("Running ping: %s" % " ".join(ping_command))
cmd_output = None
try:
cmd_output = subprocess.run( # nosec: B603
ping_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
logger.error(
'Execution of "%s" failed and was aborted due to timeout.' % ping_command
)
return output
except Exception as e:
logger.error("Unhandled exception caught: %s" % e)
return
# Parse the fping output
# 127.0.0.1 : xmt/rcv/%loss = 2/2/0%, min/avg/max = 0.07/0.12/0.18\n
# Prepare the metric
output["avg"] = 0
output["max"] = 0
output["min"] = 0
output["loss"] = 100
if cmd_output.returncode != 0:
logger.debug("Received error code %d - printing stderr" % cmd_output.returncode)
logger.debug(cmd_output.stderr)
return output
if not cmd_output.stdout:
if not cmd_output.stderr:
logger.error("No output received!")
return output
else:
cmd_output = cmd_output.stderr
else:
cmd_output.stdout
cmd_output = cmd_output.splitlines()
for line in cmd_output:
match = ping_match.match(line)
if match:
output["loss"] = match.group(3)
output["min"] = match.group(4)
output["avg"] = match.group(5)
output["max"] = match.group(6)
logger.debug(
"Got match: %s/%s/%s, loss: %s"
% (output["min"], output["avg"], output["max"], output["loss"])
)
# first match suffices
break
else:
logger.debug('Error parsing output of "%s"' % line)
return output
def lookup(target, af=4, proto="udp", dnssec=False, resolver=None):
start_time = time.time()
# get system nameservers
if resolver:
_resolver = resolver
else:
_resolver = dns.resolver.Resolver()
if not _resolver.nameservers:
return
nsaddr = None
for ns in _resolver.nameservers:
# select desired AF and just use first one
if af == 6:
if netaddr.valid_ipv6(ns):
nsaddr = ns
break
else:
if netaddr.valid_ipv4(ns):
nsaddr = ns
break
logger.debug("Using nameserver: %s" % nsaddr)
response = None
if proto == "tcp" or dnssec:
if dnssec:
# We are _really, really_ sure this exists, no matter what pylint believes
rdatatype = dns.rdatatype.DNSKEY # pylint: disable=no-member
else:
# We are _really, really_ sure this exists, no matter what pylint believes
rdatatype = dns.rdatatype.SOA # pylint: disable=no-member
request = dns.message.make_query(target, rdatatype, want_dnssec=dnssec)
try:
response = dns.query.tcp(request, nsaddr, timeout=3)
except Exception as e:
logger.error("DNS lookup error: %s" % e)
return
else:
# We are _really, really_ sure this rdatatype exists, no matter what pylint believes
rdatatype = dns.rdatatype.SOA # pylint: disable=no-member
request = dns.message.make_query(target, rdatatype)
try:
response = dns.query.udp(request, nsaddr, timeout=3)
except Exception as e:
logger.error("DNS lookup error: %s" % e)
return
if response.rcode() != 0:
logger.error("Invalid response trying to lookup %s over IPv%d!" % (target, af))
else:
# valid response
answer = response.answer
if not dnssec:
# XXX: we might also want to check the answer details, if we really cared...
return time.time() - start_time
else:
if len(answer) == 2:
# answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY)
logger.debug("Got answer for DNS query, now validating DNSSEC:")
name = dns.name.from_text(target)
try:
dns.dnssec.validate(answer[0], answer[1], {name: answer[0]})
except dns.dnssec.ValidationFailure:
logger.error("Invalid DNSSEC")
return
else:
return time.time() - start_time
else:
if len(answer) > 1:
return time.time() - start_time
else:
return
def create_connection_with(*, resolved_address):
if not resolved_address:
return socket.create_connection
def create_connection(host_and_port, timeout, source_address):
host, port = host_and_port
logger.debug(f"Connection to fixed address {resolved_address}")
return socket.create_connection(
(resolved_address, port), timeout, source_address
)
return create_connection
class HTTPConnection(http.client.HTTPConnection):
def __init__(self, *args, resolved_address=None, **kwargs):
super().__init__(*args, **kwargs)
self._create_connection = create_connection_with(
resolved_address=resolved_address
)
class HTTPSConnection(http.client.HTTPSConnection):
def __init__(self, *args, resolved_address=None, **kwargs):
super().__init__(*args, **kwargs)
self._create_connection = create_connection_with(
resolved_address=resolved_address
)
class HTTPHandler(AbstractHTTPHandler):
handler_order = 499 # before default http handlers
def __init__(self, *args, resolved_address=None, context=None, **kwargs):
super().__init__(*args, **kwargs)
self._resolved_address = resolved_address
self._context = context
def http_open(self, req):
return self.do_open(
HTTPConnection, req, resolved_address=self._resolved_address
)
def https_open(self, req):
return self.do_open(
HTTPSConnection,
req,
context=self._context,
resolved_address=self._resolved_address,
)
def httpopen(*args, resolved_address=None, context=None, **kwargs):
http_handler = HTTPHandler(resolved_address=resolved_address, context=context)
logger.debug(f"Connection to URL {args}")
return build_opener(http_handler).open(*args, **kwargs)
def get_http(target, af=6, ca=None):
output = {}
logger.debug("Fetching target: %s" % target)
headers = {}
headers[
"User-Agent"
] = "Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:48.0) Gecko/20100101 Firefox/48.0"
headers["Host"] = target
if ca:
prefix = "https"
port = 443
else:
prefix = "http"
port = 80
try:
addrs = socket.getaddrinfo(target, port)
if addrs:
used = set()
if af == 6:
addrs = [
addr[4][0]
for addr in addrs
if (
(addr[0] == socket.AF_INET6)
and (addr[0] not in used)
and (used.add(addr[0]) or True)
)
]
if not addrs:
logger.debug("DNS lookup error")
return output
else:
addrs = [
addr[4][0]
for addr in addrs
if (
(addr[0] == socket.AF_INET)
and (addr[0] not in used)
and (used.add(addr[0]) or True)
)
]
if not addrs:
logger.debug("DNS lookup error")
return output
for _a in addrs:
try:
start_time = time.time()
response = httpopen(
"%s://%s:%d/" % (prefix, target, port), resolved_address=_a
)
except HTTPError as e:
output["status"] = e.code
logger.debug(
"The server couldn't fulfill the request. Error code: %s"
% e.code
)
logger.error("Error getting stats", exc_info=False)
except URLError as e:
logger.debug("We failed to reach a server. Reason: %s" % e.reason)
logger.error("Error getting stats", exc_info=False)
else:
logger.debug("built connection to: %s" % target)
# just ensure that we actually get all of the reply and not only the header
_res = response.read()
logger.debug("got %d bytes reply" % len(_res))
logger.debug("finished read connection to: %s" % target)
output["status"] = response.status
output["delay"] = time.time() - start_time
logger.debug(
"completed connection with status %d after %d sec"
% (response.status, output["delay"])
)
response.close()
logger.debug("closed connection to: %s" % target)
# we're happy with first successful attempt in round-robin DNS
break
else:
logger.error(
"Error getting stats - no addresses returned from DNS!", exc_info=False
)
except socket.gaierror as e:
logger.error("DNS lookup error: %s" % e, exc_info=False)
except Exception as e:
logger.error("Error getting http target DNS lookup: %s" % e, exc_info=True)
logger.debug("returning output for connection to: %s" % target)
return output
# imported from https://github.com/wifinigel/wiperf_poller/blob/master/wiperf_poller/testers/iperf3tester.py
def calculate_mos(rtt_avg_ms, jitter_ms, lost_percent):
"""
Calculation of approximate MOS score
(This was kindly contributed by Mario Gingras, based on this
article: https://netbeez.net/blog/impact-of-packet-loss-jitter-and-latency-on-voip/)
Returns:
MOS value -- float (1.0 to 4.5)
As the codec is assumed to be G.711 - which by design cannot reach up to 5.0
"""
# effective_latency=(rtt_avg_ms/2*jitter_ms)+40
effective_latency = (rtt_avg_ms / 2) + (2 * jitter_ms) + 10.0
if effective_latency < 160:
R = 93.2 - (effective_latency / 40)
else:
R = 93.2 - ((effective_latency - 120) / 10)
R = R - 2.5 * lost_percent
if R < 0:
mos_score = 1.0
elif R < 100:
mos_score = 1 + 0.035 * R + 0.000_007 * R * (R - 60) * (100 - R)
else:
mos_score = 4.5
return mos_score
def iperf_client_test(
server_hostname=args.iperf_target,
udp=True,
af=6,
tx=True,
duration=10,
port=args.iperf_port,
bandwidth=10_000_000,
timeout=2000,
debug=False,
):
iperf = "/usr/bin/iperf3"
_timeout = int(duration + timeout / 1000 + 10)
iperf_cmd = [
"/usr/bin/timeout",
str(_timeout),
iperf,
"-{}".format(af),
"-c",
server_hostname,
"-t",
str(duration),
"-p",
str(port),
"-J",
"--connect-timeout",
str(timeout),
]
protocol = "TCP"
if udp:
protocol = "UDP"
for _e in ["-u", "-b", str(bandwidth)]:
iperf_cmd.append(_e)
if not tx:
iperf_cmd.append("-R")
iperf_cmd_string = " ".join(iperf_cmd)
logger.debug("iperf server test command: %s" % iperf_cmd_string)
# run the test
try:
output = subprocess.check_output( # nosec: B603
iperf_cmd, stderr=subprocess.STDOUT, timeout=_timeout
).decode()
except subprocess.CalledProcessError as exc:
try:
iperf_json = json.loads(exc.output.decode())
err_msg = iperf_json["error"]
logger.error(
"iperf {} test error ({}:{}): {}".format(
protocol, server_hostname, port, err_msg
)
)
except json.decoder.JSONDecodeError:
logger.error("Invalid JSON returned by iperf command!")
return False
except subprocess.TimeoutExpired:
logger.error(
"iperf {} test error ({}:{}): timeout!".format(
protocol, server_hostname, port
)
)
return False
if output:
try:
iperf_json = json.loads(output)
except Exception as e:
logger.error("Failed to parse iperf JSON output due to: %s" % e)
return False
# extract data
_sum = "sum"
if not udp:
if tx:
_sum = "sum_sent"
else:
_sum = "sum_received"
bytes = iperf_json["end"][_sum]["bytes"]
bps = iperf_json["end"][_sum]["bits_per_second"]
kbps = bps / 1000
Mbps = kbps / 1000
# kB_s = bps / (8 * 1024)
# MB_s = kB_s / 1024
# seconds = iperf_json['end'][_sum]['seconds']
if udp:
packets = iperf_json["end"][_sum]["packets"]
lost_packets = iperf_json["end"][_sum]["lost_packets"]
lost_percent = iperf_json["end"][_sum]["lost_percent"]
jitter_ms = iperf_json["end"][_sum]["jitter_ms"]
result = {}
result["bytes"] = bytes
result["mbps"] = Mbps
if udp:
result["packets"] = packets
result["lost_packets"] = lost_packets
result["lost_percent"] = lost_percent
result["jitter_ms"] = jitter_ms
return result
def run_iperf_test(udp=True, af=6, tx=True):
server_hostname = args.iperf_target
port = args.iperf_port
protocol = "TCP"
if udp:
protocol = "UDP"
direction = "c->s"
if not tx:
direction = "s->c"
logger.debug(
"Starting iperf3 {} test over IPv{} ({}:{}) direction: {}...".format(
protocol, af, server_hostname, str(port), direction
)
)
# Run a ping to the iperf server to get an rtt to feed in to MOS score calc
try:
addrs = socket.getaddrinfo(server_hostname, port)
except Exception as e:
logger.error("DNS lookup error: %s" % e)
return False
ping_result = {}
if addrs:
used = set()
if af == 6:
addrs = [
addr[4][0]
for addr in addrs
if addr[0] == socket.AF_INET6
and addr[0] not in used
and (used.add(addr[0]) or True)
]
if not addrs:
logger.error("DNS lookup error")
return False
else:
addrs = [
addr[4][0]
for addr in addrs
if addr[0] == socket.AF_INET
and addr[0] not in used
and (used.add(addr[0]) or True)
]
if not addrs:
logger.error("DNS lookup error")
return False
for _a in addrs:
# one ping to seed...
# FIXME: iperf servers aren't round robin anyway, don't add multi-IP checks here...
ping_result = ping(_a, 1, 1, 50, "")
ping_result = ping(_a, 1, 5, 50, "")
else:
logger.debug("DNS lookup error")
return False
# ping results
if ping_result:
rtt_avg_ms = round(float(ping_result["avg"]), 2)
else:
rtt_avg_ms = 0
# Run the iperf test
result = iperf_client_test(
server_hostname, udp=udp, port=port, debug=False, af=af, tx=tx
)
if result is not False:
logger.debug("iperf3 complete:")
logger.debug("bytes: %s" % result["bytes"])
logger.debug("mbps: %s" % result["mbps"])
results_dict = {}
results_dict["time"] = int(time.time())
results_dict["bytes"] = result["bytes"]
results_dict["mbps"] = round(result["mbps"], 1)
if udp:
logger.debug("lost_packets: %s" % result["lost_packets"])
logger.debug("lost_percent: %s" % result["lost_percent"])
results_dict["packets"] = result["packets"]
results_dict["lost_packets"] = result["lost_packets"]
results_dict["lost_percent"] = float(result["lost_percent"])
results_dict["jitter_ms"] = round(result["jitter_ms"], 1)
results_dict["mos_score"] = calculate_mos(
rtt_avg_ms,
round(result["jitter_ms"], 1),
round(result["lost_percent"], 1),
)
logger.debug("mos_score: %s:" % results_dict["mos_score"])
# workaround for crazy jitter figures sometimes seen
if results_dict["jitter_ms"] > 2000:
logger.error(
"Received very high jitter value({}), set to none".format(
results_dict["jitter_ms"]
)
)
results_dict["jitter_ms"] = None
logger.debug("jitter_ms: %s" % results_dict["jitter_ms"])
return results_dict
else:
logger.error("Error with iperf3 test, check logs")
return False
def system_call(command):
logger.debug("Running system call: %s" % command)
p = subprocess.Popen( # nosec: B603
[command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
).communicate()
return p
def get_gateway_address_v4():
if sys.platform == "linux":
# print('Checking Linux IPv4 default route...')
default_route = subprocess.run( # nosec: B603, B607
["/usr/sbin/ip", "-4", "route", "list", "default"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout.splitlines()
if not default_route:
logger.error("No IPv4 default route present")
return
match = default_v4_match.match(default_route[0])
if match:
return match.group(1)
else:
logger.error("No IPv4 default route present")
return False
else:
# right now we only care about macOS/OpenBSD
# print('Checking BSD IPv4 default route...')
return system_call("route -n get -inet default | awk '/gateway/{print $2}'")
def get_gateway_address_v6():
if sys.platform == "linux":
# print('Checking Linux IPv6 default route...')
default_route = subprocess.run( # nosec: B603, B607
["/usr/sbin/ip", "-6", "route", "list", "default"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout.splitlines()
if not default_route:
logger.error("No IPv6 default route present")
return
match = default_v6_match.match(default_route[0])
if match:
return match.group(1)
else:
logger.error("No IPv6 default route present")
return False
else:
# right now we only care about macOS/OpenBSD
# print('Checking BSD IPv6 default route...')
return system_call("route -n get -inet6 default | awk '/gateway/{print $2}'")
def ping_ipv4():
result = ping(
args.ping_target_v4, args.ping_interval, args.ping_count, args.ping_size, ""
)
if result:
logger.debug(
"Got IPv4 PING result: %s/%s%s, loss %s %%"
% (result["min"], result["avg"], result["max"], result["loss"])
)
IPv4_ping_avg.labels(target=args.ping_target_v4).set(result["avg"])
IPv4_ping_min.labels(target=args.ping_target_v4).set(result["min"])
IPv4_ping_max.labels(target=args.ping_target_v4).set(result["max"])
IPv4_ping_loss.labels(target=args.ping_target_v4).set(result["loss"])
else:
logger.error("Failed to get IPv4 ping results!")
def ping_ipv6():
result = ping(
args.ping_target_v6, args.ping_interval, args.ping_count, args.ping_size, ""
)
if result:
logger.debug(
"Got IPv6 PING result: %s/%s%s, loss %s %%"
% (result["min"], result["avg"], result["max"], result["loss"])
)
IPv6_ping_avg.labels(target=args.ping_target_v6).set(result["avg"])
IPv6_ping_min.labels(target=args.ping_target_v6).set(result["min"])
IPv6_ping_max.labels(target=args.ping_target_v6).set(result["max"])
IPv6_ping_loss.labels(target=args.ping_target_v6).set(result["loss"])
else:
logger.error("Failed to get IPv6 ping results!")
def check_wlan():
output = {}
global bssids
global channels
start_time = time.time()
logger.debug("Checking WLAN stats")
if sys.platform == "linux":
bssid = "00:00:00:00:00:00"
check_cmd = ["/usr/sbin/iw", args.wlan_dev, "link"]
logger.debug('Running command: "%s"' % check_cmd)
cmd_out = subprocess.run( # nosec: B603
check_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout
for line in cmd_out.split("\n"):
# logger.debug('Got line: >>> %s <<<' % line)
match = linux_iw_link.match(line)
if match:
key = match.group(1)
value = match.group(2)
if key == "signal":
line_match = linux_iw_link_signal.match(value)
if line_match:
rssi = line_match.group(1)
logger.debug('Got RSSI: "%s"' % rssi)
output["rssi"] = rssi
if key == "SSID":
logger.debug('Got SSID: "%s"' % value)
output["ssid"] = value
else:
match = linux_iw_link_bssid.match(line)
if match:
logger.debug(
'Got BSSID: "%s" - adding to array for histogram'
% match.group(1)
)
bssid = match.group(1)
bssids[start_time] = bssid
BSSID.info({"bssid": bssid})
logger.debug('Finished running "%s"' % check_cmd)
iw_info = ["/usr/sbin/iw", args.wlan_dev, "info"]
logger.debug('Running command: "%s"' % iw_info)
iw_info_out = subprocess.run( # nosec: B603
iw_info,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout
for line in iw_info_out.split("\n"):
# logger.debug('Got line: >>> %s <<<' % line)
match = linux_iw_info_split.match(line)
if match:
logger.debug('Got channel: "%s"' % match.group(1))
output["channel"] = match.group(1)
logger.debug('Got channel width: "%s"' % match.group(2))
output["channel_width"] = match.group(2)
else:
match = linux_iw_info_txpwr.match(line)
if match:
logger.debug('Got TX power: "%s"' % match.group(1))
output["tx_pwr"] = match.group(1)
logger.debug('Finished running "%s"' % iw_info)
iw_sta_dump = ["/usr/sbin/iw", args.wlan_dev, "station", "dump"]
logger.debug('Running command: "%s"' % iw_sta_dump)
iw_sta_dump_out = subprocess.run( # nosec: B603
iw_sta_dump,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout
for line in iw_sta_dump_out.split("\n"):
# logger.debug('Got line: >>> %s <<<' % line)
match = linux_iw_station_dump.match(line)
if match:
key = match.group(1)
value = match.group(2)
# logger.debug('Got key: "%s" == value: "%s"' % (key, value))
if key == "rx bitrate":
output["rx_rate"] = value.split(" ")[0]
line_match = linux_iw_station_mcs_rate.match(value)
if line_match:
output["mcs_rx"] = line_match.group(1)
logger.debug('Got MCS RX: "%s"' % output["mcs_rx"])
if key == "tx bitrate":
output["tx_rate"] = value.split(" ")[0]
line_match = linux_iw_station_mcs_rate.match(value)
if line_match:
output["mcs_tx"] = line_match.group(1)
logger.debug('Got MCS TX: "%s"' % output["mcs_tx"])
# FIXME: all of these still needs Gauges...
if key == "rx bytes":
logger.debug('Got RX bytes: "%s"' % value)
output["rx_bytes"] = value
if key == "tx bytes":
logger.debug('Got TX bytes: "%s"' % value)
output["tx_bytes"] = value
if key == "rx packets":
logger.debug('Got RX packets: "%s"' % value)
output["rx_packets"] = value
if key == "tx packets":
logger.debug('Got TX packets: "%s"' % value)
output["tx_packets"] = value
if key == "tx failed":
logger.debug('Got TX failed: "%s"' % value)
output["tx_failed"] = value
if key == "beacon loss":
logger.debug('Got beacon loss: "%s"' % value)
output["beacon_loss"] = value
if key == "beacon rx":
logger.debug('Got beacon RX: "%s"' % value)
output["beacon_rx"] = value
if key == "rx drop misc":
logger.debug('Got RX drop misc: "%s"' % value)
output["rx_drop_misc"] = value
if key == "beacon signal avg":
line_match = linux_iw_station_beacon_signal.match(value)
if line_match:
output["beacon_signal_avg"] = line_match.group(1)
logger.debug(
'Got beacon signal avg: "%s"' % output["beacon_signal_avg"]
)
if key == "last_ack_signal":
logger.debug('Got last ack signal: "%s"' % value)
output["last_ack_signal"] = value
if key == "authorized":
logger.debug('Got authorized: "%s"' % value)
output["authorized"] = value
if key == "authenticated":
logger.debug('Got authenticated: "%s"' % value)
output["authenticated"] = value
if key == "associated":
logger.debug('Got assoicated: "%s"' % value)
output["associated"] = value
if key == "beacon interval":
logger.debug('Got beacon interval: "%s"' % value)
output["beacon_interval"] = value
if key == "connected time":
line_match = linux_iw_connected_time.match(value)
if line_match:
output["connected_time"] = line_match.group(1)
logger.debug(
'Got connected time: "%s"' % output["connected_time"]
)
iw_scan_dump = ["/usr/sbin/iw", args.wlan_dev, "scan", "dump"]
logger.debug('Running command: "%s"' % iw_scan_dump)
iw_scan_dump_out = subprocess.run( # nosec: B603
iw_scan_dump,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout
found_assoc = False
found_load = False
scan_complete = False
for line in iw_scan_dump_out.split("\n"):
# logger.debug('Got line: >>> %s <<<' % line)
if found_load:
if not scan_complete:
logger.debug("BSS Load not fully parsed - looking for values...")
match = linux_iw_scan_dump.match(line)
if match:
key = match.group(1)
value = match.group(2)
# logger.debug('Got key: "%s" == value: "%s"' % (key, value))
if key == "station count":
output["station_count"] = value
if key == "channel utilisation":
line_match = linux_iw_chan_utilization.match(value)
if line_match:
# converting to percentage
output["channel_utilization"] = int(
line_match.group(1)
) / int(line_match.group(2))
logger.debug(
'Got channel utilization: "%s"'
% output["channel_utilization"]
)
if key == "available admission capacity":
line_match = re_first_number.match(value)
if line_match:
output["available_capacity"] = line_match.group(1)
logger.debug(
'Got available admission capacity: "%s"'
% output["available_capacity"]
)
# we don't want any more outputs from here
scan_complete = True
elif found_assoc:
# logger.debug('Associated BSS found - looking for values...')
match = linux_assoc_bssid.match(line)
if match:
key = match.group(1)
value = match.group(2)
# logger.debug('Got key: "%s" == value: "%s"' % (key, value))
if key == "BSS Load":
# logger.debug('Found BSS Load block')
found_load = True
if key == "last seen":
# logger.debug('Found last seen')
line_match = re_first_number.match(value)
if line_match:
# converting to percentage
output["last_seen"] = line_match.group(1)
logger.debug('Got last seen: "%s"' % output["last_seen"])
else:
match = linux_is_assoc.match(line)
if match:
# logger.debug('Found associated!')
found_assoc = True
# common matching
if output.get("mcs_rx"):
logger.debug('Setting MCS RX: "%s"' % output["mcs_rx"])
MCS_rx.set(output["mcs_rx"])
if output.get("rx_rate"):
logger.debug('Setting RX bitrate: "%s"' % output["rx_rate"])
RX_rate.set(output["rx_rate"])
if output.get("mcs_tx"):
logger.debug('Setting MCS TX: "%s"' % output["mcs_tx"])
MCS_tx.set(output["mcs_tx"])
if output.get("tx_rate"):
logger.debug('Setting TX birate: "%s"' % output["tx_rate"])
TX_rate.set(output["tx_rate"])
if output.get("tx_pwr"):
logger.debug('Setting TX power: "%s"' % output["tx_pwr"])
TX_pwr.set(output["tx_pwr"])
if output.get("rx_bytes"):
logger.debug('Setting RX bytes: "%s"' % output["rx_bytes"])
RX_bytes.set(output["rx_bytes"])
if output.get("tx_bytes"):
logger.debug('Setting TX bytes: "%s"' % output["tx_bytes"])
TX_bytes.set(output["tx_bytes"])
if output.get("rx_packets"):
logger.debug('Setting RX packets: "%s"' % output["rx_packets"])
RX_pkts.set(output["rx_packets"])
if output.get("tx_packets"):
logger.debug('Setting TX packets: "%s"' % output["tx_packets"])
TX_pkts.set(output["tx_packets"])
if output.get("tx_failed"):
logger.debug('Setting TX failed: "%s"' % output["tx_failed"])
TX_failed.set(output["tx_failed"])
if output.get("beacon_loss"):
logger.debug('Setting beacon loss: "%s"' % output["beacon_loss"])
Beacon_loss.set(output["beacon_loss"])
if output.get("beacon_rx"):
logger.debug('Setting beacon RX: "%s"' % output["beacon_rx"])
Beacon_rx.set(output["beacon_rx"])
if output.get("rx_drop_misc"):
logger.debug('Setting RX drop misc: "%s"' % output["rx_drop_misc"])
RX_drop_misc.set(output["rx_drop_misc"])
if output.get("beacon_signal_avg"):
logger.debug(
'Setting beacon signal average: "%s"' % output["beacon_signal_avg"]
)
Beacon_signal_avg.set(output["beacon_signal_avg"])
if output.get("last_ack_signal"):
logger.debug('Setting last ACK signal: "%s"' % output["last_ack_signal"])
Last_ack_signal.set(output["last_ack_signal"])
if output.get("authorized") == "yes":
logger.debug("Setting authorized to TRUE")
Authorized.set(1)
elif output.get("authorized") == "no":
logger.debug("Setting authorized to FALSE")
Authorized.set(0)
if output.get("authenticated") == "yes":
logger.debug("Setting authenticated to TRUE")
Authenticated.set(1)
elif output.get("authenticated") == "no":
logger.debug("Setting authenticated to FALSE")
Authenticated.set(0)
if output.get("associated") == "yes":
logger.debug("Setting associated to TRUE")
Associated.set(1)
elif output.get("associated") == "no":
logger.debug("Setting associated to FALSE")
Associated.set(0)
if output.get("beacon_interval"):
logger.debug('Setting beacon interval: "%s"' % output["beacon_interval"])
Beacon_interval.set(output["beacon_interval"])
if output.get("connected_time"):
logger.debug('Setting connected time: "%s"' % output["connected_time"])
Connected_time.set(output["connected_time"])
if output.get("last_seen"):
logger.debug('Setting last seen: "%s"' % output["last_seen"])
Last_seen.set(output["last_seen"])
if output.get("station_count"):
logger.debug('Setting station count: "%s"' % output["station_count"])
Station_count.set(output["station_count"])
if output.get("channel_utilization"):
logger.debug(
'Setting channel utilization: "%s"' % output["channel_utilization"]
)
Channel_utilization.set(output["channel_utilization"])
if output.get("available_capacity"):
logger.debug(
'Setting available capacity: "%s"' % output["available_capacity"]
)
Available_capacity.set(output["available_capacity"])
else:
# macOS handling..
check_cmd = [
"/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport",
args.wlan_dev,
" -I",
]
logger.debug('Running command: "%s"' % check_cmd)
cmd_out = subprocess.run( # nosec: B603
check_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
timeout=10,
).stdout
for line in cmd_out.split("\n"):
# logger.debug('Got line: >>> %s <<<' % line)
match = re_macos_wlan.match(line)
if match:
key = match.group(1)
value = match.group(2)
logger.debug('Got key: "%s" == value: "%s"' % (key, value))
if key == "agrCtlRSSI":
logger.debug('Got RSSI: "%s"' % value)
output["rssi"] = value
if key == "MCS":
logger.debug('Got MCS: "%s"' % value)
output["mcs"] = value
if key == "SSID":
logger.debug('Got SSID: "%s"' % value)
output["ssid"] = value
if key == "channel":
output["channel"] = value.split(",")[0]
output["channel_width"] = value.split(",")[1]
logger.debug('Got channel: "%s"' % output["channel"])
logger.debug('Got channel width: "%s"' % output["channel_width"])
if key == "BSSID":
logger.debug('Adding BSSI to dict: "%s"' % value)
bssids[start_time] = value
BSSID.info({"bssid": value})
output["bssid"] = value
if output.get("mcs"):
logger.debug('Setting MCS to: "%s"' % output["mcs"])
if output.get("ssid"):
MCS.labels(ssid=output["ssid"]).set(output["mcs"])
else:
MCS.set(output["mcs"])
# common Gauges..
if output.get("ssid") and output.get("bssid"):
logger.debug('Setting SSID to: "%s"' % output["ssid"])
SSID.info({output["ssid"]: output["bssid"]})
if output.get("rssi"):
logger.debug('Setting RSSI to: "%s"' % output["rssi"])
if output.get("ssid"):
RSSI.labels(ssid=output["ssid"]).set(output["rssi"])
else:
RSSI.set(output["rssi"])
if output.get("channel"):
logger.debug('Setting channel to: "%s"' % output["channel"])
if output.get("ssid"):
Chan.labels(ssid=output["ssid"]).set(output["channel"])
else:
Chan.set(output["channel"])
channels[start_time] = int(output["channel"])
if output.get("channel_width"):
logger.debug('Setting channel width to: "%s"' % output["channel_width"])
if output.get("ssid"):
Chan_width.labels(ssid=output["ssid"]).set(output["channel_width"])
else:
Chan_width.set(output["channel_width"])
time_cutoff = start_time - args.bssid_timeout
if len(bssids):
logger.debug("Cleaning up BSSID dict")
bssids = {k: v for k, v in bssids.items() if k > time_cutoff}
bssid_count = {}
logger.debug("Counting BSSID dict")
for key, val in bssids.items():
bssid_count[val] = 1
logger.debug("Setting BSSID visited to %d" % len(bssid_count))
BSSIDS_visited.set(len(bssid_count))
if len(channels):
logger.debug("Cleaning up channels visited dict")
channels = {k: v for k, v in channels.items() if k > time_cutoff}
channel_changes = 0
last_channel = 0
last_band_5GHz = True # doesn't matter, will be reset upon first use..
band_changes = 0
for key, chan in channels.items():
if last_channel == 0:
logger.debug("Initializing last channel data")
last_band_5GHz = int(chan) > 14
last_channel = int(chan)
if chan != last_channel:
logger.debug("Found new channel: %d" % chan)
last_channel = chan
channel_changes = channel_changes + 1
logger.debug("channel changes is now: %d" % channel_changes)
if last_band_5GHz and int(chan) < 36:
last_band_5GHz = False
band_changes = band_changes + 1
logger.debug(
"STA has roamed to 2.4 GHz! Band changes now: %d" % band_changes
)
elif not last_band_5GHz and int(chan) > 14:
last_band_5GHz = True
band_changes = band_changes + 1
logger.debug(
"STA has roamed to 5 GHz! Band changes now: %d" % band_changes
)
logger.debug("Reporting channel changes as: %d" % channel_changes)
Channel_changes.set(channel_changes)
logger.debug("Reporting band changes as: %d" % band_changes)
Band_changes.set(band_changes)
# add metric for performance monitoring
logger.debug("Reporting observation of collection time")
WLAN_collect.observe(time.time() - start_time)
class WLANGatherer(Thread):
"""Periodically retrieve data from WLAN in a separate thread,"""
def __init__(self):
Thread.__init__(self)
self.name = "WLANGatherer"
def run(self):
logger.debug("Starting WLAN data gather thread")
while True:
try:
logger.debug("Running check_wlan in thread")
check_wlan()
logger.debug("Done: Running check_wlan in thread")
except Exception:
# Ignore failures, we will try again after refresh_interval.
# Most of them are termporary ie. connectivity problmes
logger.error("Error getting stats", exc_info=True)
logger.debug("Sleeping in WLAN thread for %d s" % args.wlan_delay)
time.sleep(args.wlan_delay)
class PING4Gatherer(Thread):
"""Periodically retrieve data from IPv4 ICMP and DNS data in a separate thread,"""
def __init__(self):
Thread.__init__(self)
self.name = "PING4Gatherer"
self.resolver = dns.resolver.Resolver()
def run(self):
logger.debug("Starting IPv4 PING data gather thread")
start_time = time.time()
while True:
try:
logger.debug("Running IPv4 ICMP checks.")
if get_gateway_address_v4():
logger.debug("Got IPv4 default route")
IPv4Up.set(1)
if not self.resolver.nameservers:
logger.error(
"Got empty resolvers list - refreshing from resolv.conf"
)
self.resolver = dns.resolver.Resolver()
if not self.resolver.nameservers:
logger.error("Still no resolv.conf - aborting")
break
# run ICMP check
logger.debug("Running IPv4 ping test...")
ping_ipv4()
# run DNS checks
logger.debug("Running IPv4 DNS test with DNSSEC...")
dnssec = lookup(args.dns_lookup, 4, "tcp", True, self.resolver)
logger.debug("Got IPv4 DNSSEC delay %s" % dnssec)
if dnssec:
DNSSEC_IPv4.labels(target=args.dns_lookup).set(1)
IPv4_DNSSEC_delay.labels(target=args.dns_lookup).set(dnssec)
else:
DNSSEC_IPv4.labels(target=args.dns_lookup).set(0)
logger.debug("Running IPv4 DNS test over TCP without DNSSEC...")
dns_tcp = lookup(args.dns_lookup, 4, "tcp", False, self.resolver)
if dns_tcp:
logger.debug("Got IPv4 DNS TCP delay %s" % dns_tcp)
IPv4_DNS_TCP_delay.labels(target=args.dns_lookup).set(dns_tcp)
logger.debug("Running IPv4 DNS test with UDP...")
dns_udp = lookup(args.dns_lookup, 4, "udp", False, self.resolver)
if dns_udp:
logger.debug("Got IPv4 DNS UDP delay %s" % dns_udp)
IPv4_DNS_UDP_delay.labels(target=args.dns_lookup).set(dns_udp)
# check http
logger.debug("Running IPv4 http test ...")
http_check = get_http(args.http_target, af=4, ca=None)
if http_check:
IPv4_http_delay.labels(target=args.http_target).set(
http_check["delay"]
)
IPv4_http_status_code.labels(target=args.http_target).set(
http_check["status"]
)
# check https
logger.debug("Running IPv4 https test ...")
http_check = get_http(args.https_target, af=4, ca=args.https_ca)
if http_check:
IPv4_https_delay.labels(target=args.https_target).set(
http_check["delay"]
)
IPv4_https_status_code.labels(target=args.https_target).set(
http_check["status"]
)
else:
logger.debug("Got no IPv4 default route")
IPv4Up.set(0)
except Exception as e:
logger.error("FAILED in ICMP/DNS check!", exc_info=True)
IPv4Up.set(0)
IPv4_ping_loss.labels(target=args.ping_target_v4).set(100)
logger.debug("Caught exception!")
logger.error(e)
PING4_collect.observe(time.time() - start_time)
logger.debug("Sleeping for %d s" % args.delay)
time.sleep(args.delay)
class PING6Gatherer(Thread):
"""Periodically retrieve IPv6 data from ICMP and DNS data in a separate thread,"""
def __init__(self):
Thread.__init__(self)
self.name = "PING6Gatherer"
self.resolver = dns.resolver.Resolver()
def run(self):
logger.debug("Starting IPv6 PING data gather thread")
start_time = time.time()
while True:
try:
logger.debug("Running IPv6 ICMP checks.")
if get_gateway_address_v6():
logger.debug("Got IPv6 default route")
IPv6Up.set(1)
if not self.resolver.nameservers:
logger.error(
"Got empty resolvers list - refreshing from resolv.conf"
)
self.resolver = dns.resolver.Resolver()
if not self.resolver.nameservers:
logger.error("Still no resolv.conf - aborting")
break
logger.debug("Running IPv6 ping test...")
ping_ipv6()
dnssec = lookup(args.dns_lookup, 6, "tcp", True, self.resolver)
logger.debug("Got IPv6 DNSSEC TCP delay %s" % dnssec)
if dnssec:
DNSSEC_IPv6.labels(target=args.dns_lookup).set(1)
IPv6_DNSSEC_delay.labels(target=args.dns_lookup).set(dnssec)
else:
DNSSEC_IPv6.labels(target=args.dns_lookup).set(0)
dns_tcp = lookup(args.dns_lookup, 6, "tcp", False, self.resolver)
if dns_tcp:
logger.debug("Got IPv6 DNS TCP delay %s" % dns_tcp)
IPv6_DNS_TCP_delay.labels(target=args.dns_lookup).set(dns_tcp)
dns_udp = lookup(args.dns_lookup, 6, "udp", False, self.resolver)
if dns_udp:
logger.debug("Got IPv6 DNS UDP delay %s" % dns_udp)
IPv6_DNS_UDP_delay.labels(target=args.dns_lookup).set(dns_udp)
# check http
logger.debug("Running IPv6 http test ...")
http_check = get_http(args.http_target, af=6, ca=None)
if http_check:
IPv6_http_delay.labels(target=args.http_target).set(
http_check["delay"]
)
IPv6_http_status_code.labels(target=args.http_target).set(
http_check["status"]
)
# check https
logger.debug("Running IPv6 https test ...")
http_check = get_http(args.https_target, af=6, ca=args.https_ca)
if http_check:
IPv6_https_delay.labels(target=args.https_target).set(
http_check["delay"]
)
IPv6_https_status_code.labels(target=args.https_target).set(
http_check["status"]
)
else:
logger.debug("Got no IPv6 default route")
IPv6Up.set(0)
except Exception as e:
logger.error("FAILED in ICMP/DNS check!", exc_info=True)
IPv6Up.set(0)
IPv6_ping_loss.labels(target=args.ping_target_v6).set(100)
logger.error(e)
PING6_collect.observe(time.time() - start_time)
logger.debug("Sleeping for %d s" % args.delay)
time.sleep(args.delay)
class IPerfGatherer(Thread):
"""Periodically retrieve IPv4 and IPv6 data from iPerf3 server in a separate thread"""
def __init__(self):
Thread.__init__(self)
self.name = "iPerf3_Gatherer"
def run(self):
logger.debug("Starting iPerf3 data gather thread")
while True:
# IPv4
try:
logger.debug("Running IPv4 iPerf3 checks.")
if get_gateway_address_v4():
logger.debug("Got IPv4 default route")
IPv4Up.set(1)
# check iperf3
for _p in ["udp", "tcp"]:
for _d in ["tx", "rx"]:
logger.debug(
"Running iperf3 %s %s test over IPv4 ..." % (_p, _d)
)
udp = False
if _p == "udp":
udp = True
tx = False
if _d == "tx":
tx = True
iperf_result = run_iperf_test(af=4, tx=tx, udp=udp)
if iperf_result:
IPv4_iperf_mbps.labels(
target=args.iperf_target, direction=_d, protocol=_p
).set(iperf_result["mbps"])
if udp:
IPv4_iperf_packets.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["packets"])
IPv4_iperf_lost_packets.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["lost_packets"])
IPv4_iperf_lost_percent.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["lost_percent"])
IPv4_iperf_jitter.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["jitter_ms"])
IPv4_iperf_mos_score.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["mos_score"])
else:
logger.debug("Got no IPv4 default route")
IPv4Up.set(0)
except Exception as e:
logger.error("FAILED in IPv4 iPerf3 check!", exc_info=True)
IPv4Up.set(0)
logger.error(e)
# IPv6
try:
logger.debug("Running IPv6 iPerf3 checks.")
if get_gateway_address_v6():
logger.debug("Got IPv6 default route")
IPv6Up.set(1)
# check iperf3
for _p in ["udp", "tcp"]:
for _d in ["tx", "rx"]:
logger.debug(
"Running iperf3 %s %s test over IPv6 ..." % (_p, _d)
)
udp = False
if _p == "udp":
udp = True
tx = False
if _d == "tx":
tx = True
iperf_result = run_iperf_test(af=6, tx=tx, udp=udp)
if iperf_result:
IPv6_iperf_mbps.labels(
target=args.iperf_target, direction=_d, protocol=_p
).set(iperf_result["mbps"])
if udp:
IPv6_iperf_packets.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["packets"])
IPv6_iperf_lost_packets.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["lost_packets"])
IPv6_iperf_lost_percent.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["lost_percent"])
IPv6_iperf_jitter.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["jitter_ms"])
IPv6_iperf_mos_score.labels(
target=args.iperf_target, direction=_d
).set(iperf_result["mos_score"])
else:
logger.debug("Got no IPv6 default route")
IPv6Up.set(0)
except Exception as e:
logger.error("FAILED in IPv6 iPerf3 check!", exc_info=True)
IPv6Up.set(0)
logger.error(e)
logger.debug("Sleeping for %d s" % args.iperf_delay)
time.sleep(args.iperf_delay)
class SocketInheritingHTTPServer(_ThreadingSimpleServer):
"""A HttpServer subclass that takes over an inherited socket from systemd"""
def __init__(self, address_info, handler, fd, bind_and_activate=True):
_ThreadingSimpleServer.__init__(
self, address_info, handler, bind_and_activate=False
)
logger.debug("http server init complete - passing socket")
self.socket = socket.fromfd(fd, self.address_family, self.socket_type)
if bind_and_activate:
# NOTE: systemd provides ready-bound sockets, so we only need to activate:
logger.debug("http server activating")
self.server_activate()
else:
logger.debug("http server NOT activated")
if __name__ == "__main__":
# collect WLAN in the background
logger.debug("Starting WLAN gatherer thread")
wlan_gatherer = WLANGatherer()
wlan_gatherer.start()
# collect PING and DNS stats in the background
logger.debug("Starting IPv4 ICMP/DNS gatherer thread")
ping4_gatherer = PING4Gatherer()
ping4_gatherer.start()
logger.debug("Starting IPv6 ICMP/DNS gatherer thread")
ping6_gatherer = PING6Gatherer()
ping6_gatherer.start()
if args.run_iperf:
logger.debug("Starting iPerf3 gatherer thread")
iperf_gatherer = IPerfGatherer()
iperf_gatherer.start()
# ...and now serve the registry contents so that we can consume it..
if os.environ.get("LISTEN_PID", None) == str(os.getpid()):
# systemd socket activation will need that httpd is waiting for socket
# to be passed - while collection still updates in the background
# inherit the socket
logger.debug("Starting systemd socket activation http server")
CustomMetricsHandler = MetricsHandler.factory(REGISTRY)
server_args = [("localhost", args.listen_port), CustomMetricsHandler]
httpd = SocketInheritingHTTPServer(*server_args, fd=SYSTEMD_FIRST_SOCKET_FD)
logging.info(
"wlan_exporter started for socket activation on fd %s"
% (SYSTEMD_FIRST_SOCKET_FD,)
)
try:
logging.info(
"wlan_exporter httpd running on socket fd %s"
% (SYSTEMD_FIRST_SOCKET_FD,)
)
httpd.serve_forever()
except KeyboardInterrupt:
httpd.socket.close()
else:
# start the server normally
# Start up the server to expose the metrics.
logger.debug("Starting http server")
start_http_server(args.listen_port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment