Created
August 5, 2025 03:06
-
-
Save Nexarian/6f016e8b10d29a432eed0a825bd9a66e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import threading | |
import time | |
from collections import deque | |
from dataclasses import dataclass, field | |
from datetime import datetime | |
from datetime import time as dt_time | |
from enum import Enum | |
from typing import Callable, Deque, Dict, Optional, Set, Tuple | |
from zoneinfo import ZoneInfo | |
import requests | |
from pyroute2 import IPRoute | |
# === CONFIGURATION === | |
CONFIG = { | |
"quiet_windows": [ | |
(dt_time(5, 0), dt_time(8, 0)) | |
], | |
"interfaces": { | |
"wlan0": {"ssid": "TEG-1JG", "shelly": "http://shelly1g4-7c2c677ea234.local"}, | |
"wlan1": {"ssid": "TEG-2N1", "shelly": "http://shelly1g4-a085e3ca4a5c.local"}, | |
}, | |
"timings": { | |
"cycle_duration": 60, # Shelly off time | |
"check_interval": 0.5, # Main loop sleep | |
"down_debounce": 5, # Wait before acting on DOWN | |
"up_stability": 30, # Required UP time before trusting | |
"stuck_timeout": 120, # Time before considering stuck | |
"recovery_cooldown": 150, # Time between recovery attempts | |
"health_check": 30, # Periodic health check interval | |
}, | |
"flapping": { | |
"window": 60, | |
"threshold": 10, | |
}, | |
"shelly_switch_id": 0, | |
} | |
def log(msg: str): | |
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}") | |
class InterfaceOperState(Enum): | |
"""Network interface operational states""" | |
UP = "UP" | |
DOWN = "DOWN" | |
DORMANT = "DORMANT" | |
LOWERLAYERDOWN = "LOWERLAYERDOWN" | |
TESTING = "TESTING" | |
NOTPRESENT = "NOTPRESENT" | |
UNKNOWN = "UNKNOWN" | |
@classmethod | |
def from_string(cls, state: Optional[str] = None) -> 'InterfaceOperState': | |
"""Convert string to enum, with fallback to UNKNOWN""" | |
if not state: | |
return cls.UNKNOWN | |
try: | |
value = cls(state.upper()) | |
if value == InterfaceOperState.UNKNOWN: | |
log(f"Unknown network state: {state}") | |
return value | |
except (ValueError, AttributeError): | |
log(f"Unknown network state: {state}") | |
return cls.UNKNOWN | |
@property | |
def is_bad(self) -> bool: | |
"""Check if this is considered a bad state""" | |
return self in (self.DOWN, self.DORMANT, self.UNKNOWN, self.LOWERLAYERDOWN) | |
class RecoveryMethod(Enum): | |
"""Recovery methods in escalating order""" | |
SHELLY = ("shelly", lambda m, i, s: m._cycle_shelly(i, s)) | |
NMCLI_RESTART = ("nmcli_reconnect", lambda m, i, s: m._restart_wpa(i, s)) | |
INTERFACE_CYCLE = ("interface_cycle", lambda m, i, _: m._cycle_interface(i)) | |
def __init__(self, name: str, func: Callable): | |
self.method_name = name | |
self.execute = func | |
RECOVERY_PLAYBOOK: Tuple[RecoveryMethod, ...] = ( | |
RecoveryMethod.SHELLY, | |
RecoveryMethod.NMCLI_RESTART, | |
RecoveryMethod.INTERFACE_CYCLE, | |
RecoveryMethod.NMCLI_RESTART, | |
RecoveryMethod.INTERFACE_CYCLE, | |
) | |
@dataclass | |
class InterfaceState: | |
"""Tracks the state of a network interface""" | |
name: str | |
state: InterfaceOperState = InterfaceOperState.UNKNOWN | |
last_change: datetime = field(default_factory=datetime.now) | |
last_recovery: datetime = field(default_factory=lambda: datetime.min) | |
recovery_attempts: int = 0 | |
pending_timer: Optional[threading.Timer] = None | |
state_history: Deque[datetime] = field(default_factory=lambda: deque(maxlen=20)) | |
def update_state(self, new_state: InterfaceOperState) -> bool: | |
"""Update state and return True if changed""" | |
if self.state == new_state: | |
return False | |
self.state = new_state | |
self.last_change = datetime.now() | |
self.state_history.append(self.last_change) | |
if new_state == InterfaceOperState.UP: | |
self.recovery_attempts = 0 | |
return True | |
def cancel_timer(self): | |
"""Cancel any pending timer""" | |
if self.pending_timer: | |
self.pending_timer.cancel() | |
self.pending_timer = None | |
@property | |
def time_in_state(self) -> float: | |
"""Seconds since last state change""" | |
return (datetime.now() - self.last_change).total_seconds() | |
@property | |
def is_stuck(self) -> bool: | |
"""Check if stuck in bad state""" | |
return self.state.is_bad and self.time_in_state > CONFIG["timings"]["stuck_timeout"] | |
@property | |
def is_flapping(self) -> bool: | |
"""Check if rapidly changing states""" | |
window = CONFIG["flapping"]["window"] | |
threshold = CONFIG["flapping"]["threshold"] | |
now = datetime.now() | |
recent = sum(1 for t in self.state_history if (now - t).total_seconds() < window) | |
return recent >= threshold | |
@property | |
def needs_recovery(self) -> bool: | |
"""Check if recovery should be attempted""" | |
delta = (datetime.now() - self.last_recovery).total_seconds() | |
needed = delta > CONFIG["timings"]["recovery_cooldown"] | |
return needed | |
def get_recovery_method(self) -> RecoveryMethod: | |
"""Get appropriate recovery method""" | |
return RECOVERY_PLAYBOOK[self.recovery_attempts % len(RECOVERY_PLAYBOOK)] | |
class WiFiMonitor: | |
"""Simplified WiFi monitoring daemon""" | |
def __init__(self): | |
self.ip = IPRoute() | |
self.states: Dict[str, InterfaceState] = {} | |
self.active_cycles: Set[str] = set() | |
self.lock = threading.Lock() | |
self.running = True | |
@property | |
def in_quiet_window(self) -> bool: | |
"""Check if in quiet period""" | |
now = datetime.now(ZoneInfo("America/New_York")).time() | |
for start, end in CONFIG["quiet_windows"]: | |
if start <= end and start <= now <= end: | |
return True | |
elif start > end and (now >= start or now <= end): | |
return True | |
return False | |
def get_interface_state(self, ifname: str) -> InterfaceOperState: | |
"""Get actual interface state from kernel""" | |
try: | |
links = self.ip.get_links(ifname=ifname) | |
if links: | |
attrs = dict(links[0].get("attrs", [])) | |
return InterfaceOperState.from_string(attrs.get("IFLA_OPERSTATE")) | |
except Exception as e: | |
log(f"Error reading {ifname}: {e}") | |
return InterfaceOperState.UNKNOWN | |
def _cycle_shelly(self, ifname: str, ssid: str) -> bool: | |
"""Power cycle Shelly device""" | |
config = CONFIG["interfaces"].get(ifname, {}) | |
shelly_url = config.get("shelly") | |
if not shelly_url: | |
return False | |
with self.lock: | |
if ssid in self.active_cycles: | |
return False | |
self.active_cycles.add(ssid) | |
def cycle(): | |
try: | |
url = f"{shelly_url}/rpc/Switch.Set" | |
params = {"id": CONFIG["shelly_switch_id"], "on": False} | |
requests.post(url, json=params, timeout=5) | |
time.sleep(CONFIG["timings"]["cycle_duration"]) | |
params["on"] = True | |
requests.post(url, json=params, timeout=5) | |
log(f"[{ifname}] Shelly cycle complete") | |
except Exception as e: | |
log(f"[{ifname}] Shelly error: {e}") | |
finally: | |
with self.lock: | |
self.active_cycles.discard(ssid) | |
threading.Thread(target=cycle, daemon=True).start() | |
return True | |
def _restart_wpa(self, ifname: str, ssid: str) -> bool: | |
"""Rescan WiFi and reconnect using NetworkManager""" | |
try: | |
# Rescan for networks | |
subprocess.run(["nmcli", "device", "wifi", "rescan"], | |
capture_output=True, timeout=30) | |
time.sleep(2) | |
# Reconnect to the specific SSID | |
subprocess.run(["nmcli", "connection", "up", "id", f"{ssid}"], | |
capture_output=True, timeout=30) | |
log(f"[{ifname}] NetworkManager rescan and reconnect complete") | |
return True | |
except Exception as e: | |
log(f"[{ifname}] NetworkManager error: {e}") | |
return False | |
def _cycle_interface(self, ifname: str) -> bool: | |
"""Cycle interface down/up""" | |
try: | |
subprocess.run(["sudo", "ip", "link", "set", ifname, "down"], | |
capture_output=True, timeout=5) | |
time.sleep(2) | |
subprocess.run(["sudo", "ip", "link", "set", ifname, "up"], | |
capture_output=True, timeout=5) | |
return True | |
except Exception: | |
return False | |
def perform_recovery(self, state: InterfaceState, method: Optional[RecoveryMethod] = None): | |
"""Execute recovery action""" | |
if not method: | |
method = state.get_recovery_method() | |
# Skip quiet window | |
if self.in_quiet_window: | |
return | |
ssid = CONFIG["interfaces"].get(state.name, {}).get("ssid", "") | |
log(f"[{state.name}] Attempting {method.method_name} recovery") | |
method.execute(self, state.name, ssid) | |
state.last_recovery = datetime.now() | |
state.recovery_attempts += 1 | |
def schedule_recovery(self, state: InterfaceState, delay: float = CONFIG["timings"]["down_debounce"]): | |
"""Schedule delayed recovery""" | |
def check_and_recover(): | |
current = self.get_interface_state(state.name) | |
if current.is_bad: | |
log(f"[{state.name}] Still down after {delay}s") | |
self.perform_recovery(state) | |
state.pending_timer = None | |
state.cancel_timer() | |
timer = threading.Timer(delay, check_and_recover) | |
timer.daemon = True | |
timer.start() | |
state.pending_timer = timer | |
def handle_state_change(self, state: InterfaceState, new_state: InterfaceOperState): | |
"""Handle interface state transitions""" | |
if new_state == InterfaceOperState.UNKNOWN: | |
# Not useful. | |
return | |
old_state = state.state | |
if not state.update_state(new_state): | |
return # No change | |
log(f"[{state.name}] {old_state.value} → {new_state.value}") | |
# Handle transitions | |
if old_state == InterfaceOperState.UP and new_state.is_bad: | |
# Going down - schedule recovery if was stable | |
if state.time_in_state >= CONFIG["timings"]["up_stability"]: | |
self.schedule_recovery(state) | |
elif old_state.is_bad and new_state == InterfaceOperState.UP: | |
# Coming up - cancel pending recovery | |
state.cancel_timer() | |
# Check for flapping | |
if state.is_flapping: | |
log(f"[{state.name}] Flapping detected") | |
state.state_history.clear() | |
self.perform_recovery(state, RecoveryMethod.INTERFACE_CYCLE) | |
time.sleep(15) | |
self.perform_recovery(state, RecoveryMethod.NMCLI_RESTART) | |
def process_netlink_event(self, msg: dict): | |
"""Process single netlink message""" | |
attrs = dict(msg.get("attrs", [])) | |
ifname = attrs.get("IFLA_IFNAME") | |
if not ifname or ifname not in CONFIG["interfaces"]: | |
return | |
# Get or create state | |
if ifname not in self.states: | |
self.states[ifname] = InterfaceState(name=ifname) | |
state = self.states[ifname] | |
# Handle interface removal | |
if msg.get("event") == "RTM_DELLINK": | |
log(f"[{ifname}] Interface removed") | |
state.cancel_timer() | |
self.perform_recovery(state) | |
del self.states[ifname] | |
return | |
# Handle state change | |
state_string = attrs.get("IFLA_OPERSTATE") | |
new_state = InterfaceOperState.from_string(state_string) | |
self.handle_state_change(state, new_state) | |
def health_check(self): | |
"""Periodic health check for stuck interfaces""" | |
while self.running: | |
time.sleep(CONFIG["timings"]["health_check"]) | |
for state in list(self.states.values()): | |
if state.is_stuck and state.needs_recovery: | |
log(f"[{state.name}] Stuck for {state.time_in_state:.0f}s") | |
self.perform_recovery(state) | |
def initialize(self): | |
"""Initialize interface states""" | |
log("Initializing WiFi monitor...") | |
try: | |
self.ip.bind() | |
except Exception as e: | |
log(f"Failed to bind netlink: {e}") | |
return False | |
# Check initial states and gather interfaces | |
for ifname in CONFIG["interfaces"]: | |
current = self.get_interface_state(ifname) | |
state = InterfaceState(name=ifname, state=current) | |
self.states[ifname] = state | |
log(f"[{ifname}] Initial state: {current.name}") | |
# Process interfaces | |
for ifname, current in self.states.items(): | |
# Only trigger recovery if actually in a bad state AND not in quiet window | |
if current.state.is_bad and not self.in_quiet_window: | |
log(f"[{ifname}] Initial state is BAD — scheduling recovery") | |
self.perform_recovery(current) | |
return True | |
def run(self): | |
"""Main monitoring loop""" | |
if not self.initialize(): | |
return | |
# Start health check thread | |
threading.Thread(target=self.health_check, daemon=True).start() | |
try: | |
while self.running: | |
try: | |
for msg in self.ip.get(): | |
self.process_netlink_event(msg) | |
except Exception as e: | |
log(f"Error processing events: {e}") | |
time.sleep(CONFIG["timings"]["check_interval"]) | |
except KeyboardInterrupt: | |
log("Shutting down...") | |
finally: | |
self.running = False | |
for state in self.states.values(): | |
state.cancel_timer() | |
self.ip.close() | |
def main(): | |
monitor = WiFiMonitor() | |
monitor.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment