Skip to content

Instantly share code, notes, and snippets.

@Nexarian
Created August 5, 2025 03:06
Show Gist options
  • Save Nexarian/6f016e8b10d29a432eed0a825bd9a66e to your computer and use it in GitHub Desktop.
Save Nexarian/6f016e8b10d29a432eed0a825bd9a66e to your computer and use it in GitHub Desktop.
import subprocess
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from datetime import time as dt_time
from enum import Enum
from typing import Callable, Deque, Dict, Optional, Set, Tuple
from zoneinfo import ZoneInfo
import requests
from pyroute2 import IPRoute
# === CONFIGURATION ===
CONFIG = {
"quiet_windows": [
(dt_time(5, 0), dt_time(8, 0))
],
"interfaces": {
"wlan0": {"ssid": "TEG-1JG", "shelly": "http://shelly1g4-7c2c677ea234.local"},
"wlan1": {"ssid": "TEG-2N1", "shelly": "http://shelly1g4-a085e3ca4a5c.local"},
},
"timings": {
"cycle_duration": 60, # Shelly off time
"check_interval": 0.5, # Main loop sleep
"down_debounce": 5, # Wait before acting on DOWN
"up_stability": 30, # Required UP time before trusting
"stuck_timeout": 120, # Time before considering stuck
"recovery_cooldown": 150, # Time between recovery attempts
"health_check": 30, # Periodic health check interval
},
"flapping": {
"window": 60,
"threshold": 10,
},
"shelly_switch_id": 0,
}
def log(msg: str):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}")
class InterfaceOperState(Enum):
"""Network interface operational states"""
UP = "UP"
DOWN = "DOWN"
DORMANT = "DORMANT"
LOWERLAYERDOWN = "LOWERLAYERDOWN"
TESTING = "TESTING"
NOTPRESENT = "NOTPRESENT"
UNKNOWN = "UNKNOWN"
@classmethod
def from_string(cls, state: Optional[str] = None) -> 'InterfaceOperState':
"""Convert string to enum, with fallback to UNKNOWN"""
if not state:
return cls.UNKNOWN
try:
value = cls(state.upper())
if value == InterfaceOperState.UNKNOWN:
log(f"Unknown network state: {state}")
return value
except (ValueError, AttributeError):
log(f"Unknown network state: {state}")
return cls.UNKNOWN
@property
def is_bad(self) -> bool:
"""Check if this is considered a bad state"""
return self in (self.DOWN, self.DORMANT, self.UNKNOWN, self.LOWERLAYERDOWN)
class RecoveryMethod(Enum):
"""Recovery methods in escalating order"""
SHELLY = ("shelly", lambda m, i, s: m._cycle_shelly(i, s))
NMCLI_RESTART = ("nmcli_reconnect", lambda m, i, s: m._restart_wpa(i, s))
INTERFACE_CYCLE = ("interface_cycle", lambda m, i, _: m._cycle_interface(i))
def __init__(self, name: str, func: Callable):
self.method_name = name
self.execute = func
RECOVERY_PLAYBOOK: Tuple[RecoveryMethod, ...] = (
RecoveryMethod.SHELLY,
RecoveryMethod.NMCLI_RESTART,
RecoveryMethod.INTERFACE_CYCLE,
RecoveryMethod.NMCLI_RESTART,
RecoveryMethod.INTERFACE_CYCLE,
)
@dataclass
class InterfaceState:
"""Tracks the state of a network interface"""
name: str
state: InterfaceOperState = InterfaceOperState.UNKNOWN
last_change: datetime = field(default_factory=datetime.now)
last_recovery: datetime = field(default_factory=lambda: datetime.min)
recovery_attempts: int = 0
pending_timer: Optional[threading.Timer] = None
state_history: Deque[datetime] = field(default_factory=lambda: deque(maxlen=20))
def update_state(self, new_state: InterfaceOperState) -> bool:
"""Update state and return True if changed"""
if self.state == new_state:
return False
self.state = new_state
self.last_change = datetime.now()
self.state_history.append(self.last_change)
if new_state == InterfaceOperState.UP:
self.recovery_attempts = 0
return True
def cancel_timer(self):
"""Cancel any pending timer"""
if self.pending_timer:
self.pending_timer.cancel()
self.pending_timer = None
@property
def time_in_state(self) -> float:
"""Seconds since last state change"""
return (datetime.now() - self.last_change).total_seconds()
@property
def is_stuck(self) -> bool:
"""Check if stuck in bad state"""
return self.state.is_bad and self.time_in_state > CONFIG["timings"]["stuck_timeout"]
@property
def is_flapping(self) -> bool:
"""Check if rapidly changing states"""
window = CONFIG["flapping"]["window"]
threshold = CONFIG["flapping"]["threshold"]
now = datetime.now()
recent = sum(1 for t in self.state_history if (now - t).total_seconds() < window)
return recent >= threshold
@property
def needs_recovery(self) -> bool:
"""Check if recovery should be attempted"""
delta = (datetime.now() - self.last_recovery).total_seconds()
needed = delta > CONFIG["timings"]["recovery_cooldown"]
return needed
def get_recovery_method(self) -> RecoveryMethod:
"""Get appropriate recovery method"""
return RECOVERY_PLAYBOOK[self.recovery_attempts % len(RECOVERY_PLAYBOOK)]
class WiFiMonitor:
"""Simplified WiFi monitoring daemon"""
def __init__(self):
self.ip = IPRoute()
self.states: Dict[str, InterfaceState] = {}
self.active_cycles: Set[str] = set()
self.lock = threading.Lock()
self.running = True
@property
def in_quiet_window(self) -> bool:
"""Check if in quiet period"""
now = datetime.now(ZoneInfo("America/New_York")).time()
for start, end in CONFIG["quiet_windows"]:
if start <= end and start <= now <= end:
return True
elif start > end and (now >= start or now <= end):
return True
return False
def get_interface_state(self, ifname: str) -> InterfaceOperState:
"""Get actual interface state from kernel"""
try:
links = self.ip.get_links(ifname=ifname)
if links:
attrs = dict(links[0].get("attrs", []))
return InterfaceOperState.from_string(attrs.get("IFLA_OPERSTATE"))
except Exception as e:
log(f"Error reading {ifname}: {e}")
return InterfaceOperState.UNKNOWN
def _cycle_shelly(self, ifname: str, ssid: str) -> bool:
"""Power cycle Shelly device"""
config = CONFIG["interfaces"].get(ifname, {})
shelly_url = config.get("shelly")
if not shelly_url:
return False
with self.lock:
if ssid in self.active_cycles:
return False
self.active_cycles.add(ssid)
def cycle():
try:
url = f"{shelly_url}/rpc/Switch.Set"
params = {"id": CONFIG["shelly_switch_id"], "on": False}
requests.post(url, json=params, timeout=5)
time.sleep(CONFIG["timings"]["cycle_duration"])
params["on"] = True
requests.post(url, json=params, timeout=5)
log(f"[{ifname}] Shelly cycle complete")
except Exception as e:
log(f"[{ifname}] Shelly error: {e}")
finally:
with self.lock:
self.active_cycles.discard(ssid)
threading.Thread(target=cycle, daemon=True).start()
return True
def _restart_wpa(self, ifname: str, ssid: str) -> bool:
"""Rescan WiFi and reconnect using NetworkManager"""
try:
# Rescan for networks
subprocess.run(["nmcli", "device", "wifi", "rescan"],
capture_output=True, timeout=30)
time.sleep(2)
# Reconnect to the specific SSID
subprocess.run(["nmcli", "connection", "up", "id", f"{ssid}"],
capture_output=True, timeout=30)
log(f"[{ifname}] NetworkManager rescan and reconnect complete")
return True
except Exception as e:
log(f"[{ifname}] NetworkManager error: {e}")
return False
def _cycle_interface(self, ifname: str) -> bool:
"""Cycle interface down/up"""
try:
subprocess.run(["sudo", "ip", "link", "set", ifname, "down"],
capture_output=True, timeout=5)
time.sleep(2)
subprocess.run(["sudo", "ip", "link", "set", ifname, "up"],
capture_output=True, timeout=5)
return True
except Exception:
return False
def perform_recovery(self, state: InterfaceState, method: Optional[RecoveryMethod] = None):
"""Execute recovery action"""
if not method:
method = state.get_recovery_method()
# Skip quiet window
if self.in_quiet_window:
return
ssid = CONFIG["interfaces"].get(state.name, {}).get("ssid", "")
log(f"[{state.name}] Attempting {method.method_name} recovery")
method.execute(self, state.name, ssid)
state.last_recovery = datetime.now()
state.recovery_attempts += 1
def schedule_recovery(self, state: InterfaceState, delay: float = CONFIG["timings"]["down_debounce"]):
"""Schedule delayed recovery"""
def check_and_recover():
current = self.get_interface_state(state.name)
if current.is_bad:
log(f"[{state.name}] Still down after {delay}s")
self.perform_recovery(state)
state.pending_timer = None
state.cancel_timer()
timer = threading.Timer(delay, check_and_recover)
timer.daemon = True
timer.start()
state.pending_timer = timer
def handle_state_change(self, state: InterfaceState, new_state: InterfaceOperState):
"""Handle interface state transitions"""
if new_state == InterfaceOperState.UNKNOWN:
# Not useful.
return
old_state = state.state
if not state.update_state(new_state):
return # No change
log(f"[{state.name}] {old_state.value} → {new_state.value}")
# Handle transitions
if old_state == InterfaceOperState.UP and new_state.is_bad:
# Going down - schedule recovery if was stable
if state.time_in_state >= CONFIG["timings"]["up_stability"]:
self.schedule_recovery(state)
elif old_state.is_bad and new_state == InterfaceOperState.UP:
# Coming up - cancel pending recovery
state.cancel_timer()
# Check for flapping
if state.is_flapping:
log(f"[{state.name}] Flapping detected")
state.state_history.clear()
self.perform_recovery(state, RecoveryMethod.INTERFACE_CYCLE)
time.sleep(15)
self.perform_recovery(state, RecoveryMethod.NMCLI_RESTART)
def process_netlink_event(self, msg: dict):
"""Process single netlink message"""
attrs = dict(msg.get("attrs", []))
ifname = attrs.get("IFLA_IFNAME")
if not ifname or ifname not in CONFIG["interfaces"]:
return
# Get or create state
if ifname not in self.states:
self.states[ifname] = InterfaceState(name=ifname)
state = self.states[ifname]
# Handle interface removal
if msg.get("event") == "RTM_DELLINK":
log(f"[{ifname}] Interface removed")
state.cancel_timer()
self.perform_recovery(state)
del self.states[ifname]
return
# Handle state change
state_string = attrs.get("IFLA_OPERSTATE")
new_state = InterfaceOperState.from_string(state_string)
self.handle_state_change(state, new_state)
def health_check(self):
"""Periodic health check for stuck interfaces"""
while self.running:
time.sleep(CONFIG["timings"]["health_check"])
for state in list(self.states.values()):
if state.is_stuck and state.needs_recovery:
log(f"[{state.name}] Stuck for {state.time_in_state:.0f}s")
self.perform_recovery(state)
def initialize(self):
"""Initialize interface states"""
log("Initializing WiFi monitor...")
try:
self.ip.bind()
except Exception as e:
log(f"Failed to bind netlink: {e}")
return False
# Check initial states and gather interfaces
for ifname in CONFIG["interfaces"]:
current = self.get_interface_state(ifname)
state = InterfaceState(name=ifname, state=current)
self.states[ifname] = state
log(f"[{ifname}] Initial state: {current.name}")
# Process interfaces
for ifname, current in self.states.items():
# Only trigger recovery if actually in a bad state AND not in quiet window
if current.state.is_bad and not self.in_quiet_window:
log(f"[{ifname}] Initial state is BAD — scheduling recovery")
self.perform_recovery(current)
return True
def run(self):
"""Main monitoring loop"""
if not self.initialize():
return
# Start health check thread
threading.Thread(target=self.health_check, daemon=True).start()
try:
while self.running:
try:
for msg in self.ip.get():
self.process_netlink_event(msg)
except Exception as e:
log(f"Error processing events: {e}")
time.sleep(CONFIG["timings"]["check_interval"])
except KeyboardInterrupt:
log("Shutting down...")
finally:
self.running = False
for state in self.states.values():
state.cancel_timer()
self.ip.close()
def main():
monitor = WiFiMonitor()
monitor.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment