Created
March 10, 2025 20:14
-
-
Save CorradoLanera/51525b45f980593d08f140a376587acb to your computer and use it in GitHub Desktop.
Monitor tensione e corrente con picchi e allert criticità
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Monitor continuo di vcgencmd pmic_read_adc con aggiornamento, calcolo dei valori live, min/max, totali, potenza e alert. | |
Questo script esegue periodicamente il comando "vcgencmd pmic_read_adc" per estrarre le letture di voltaggio e corrente | |
per ciascun rail (sensore) e, per ogni sensore, calcola: | |
- I valori live di voltaggio, corrente e potenza (calcolata come voltaggio_live * corrente_live). | |
- Il valore minimo di voltaggio (Voltage min) e il valore massimo di corrente (Max Current) registrati negli ultimi PEAK_PERIOD secondi, | |
utilizzati per calcolare il "Power max" come il massimo istantaneo (volt * current) registrato nel periodo. | |
- Un alert basato sul confronto tra il voltaggio live e il valore nominale. | |
- I totali (somma) per la corrente e la potenza live e max; per le colonne di voltaggio totali, se disponibile, viene usato il valore live di EXT5V. | |
Le intestazioni vengono stampate su due righe per ottenere colonne compatte. | |
I parametri (intervallo di aggiornamento, periodo per il calcolo dei valori min/max e precisione dei decimali) | |
possono essere passati tramite riga di comando. | |
Esempio d'uso: | |
python3 monitor_pmic_peak.py --refresh 3 --peak 120 --precision 5 | |
""" | |
import argparse | |
import subprocess | |
import time | |
import os | |
from collections import deque | |
# Parametri default | |
DEFAULT_REFRESH_INTERVAL = 2 # intervallo (in secondi) tra aggiornamenti | |
DEFAULT_PEAK_PERIOD = 60 # periodo (in secondi) per il calcolo dei valori min/max | |
DEFAULT_PRECISION = 4 # numero di cifre decimali da mostrare | |
# Parsing degli argomenti da linea di comando | |
parser = argparse.ArgumentParser( | |
description="Monitor continuo di vcgencmd pmic_read_adc con aggiornamento, calcolo dei min/max, totali e potenza." | |
) | |
parser.add_argument("-r", "--refresh", type=float, default=DEFAULT_REFRESH_INTERVAL, | |
help="Intervallo di aggiornamento in secondi (default: %(default)s)") | |
parser.add_argument("-p", "--peak", type=float, default=DEFAULT_PEAK_PERIOD, | |
help="Periodo in secondi per il calcolo dei valori min/max (default: %(default)s)") | |
parser.add_argument("-c", "--precision", type=int, default=DEFAULT_PRECISION, | |
help="Numero di cifre decimali da mostrare (default: %(default)s)") | |
args = parser.parse_args() | |
REFRESH_INTERVAL = args.refresh | |
PEAK_PERIOD = args.peak | |
PRECISION = args.precision | |
# Mappa dei sensori: chiave = sensor id (senza suffisso _A/_V), valore = descrizione (senza "rail") | |
descriptions = { | |
"3V7_WL_SW": "Wireless Switch", | |
"3V3_SYS": "System 3.3V", | |
"1V8_SYS": "System 1.8V", | |
"DDR_VDD2": "DDR VDD2", | |
"DDR_VDDQ": "DDR VDDQ", | |
"1V1_SYS": "System 1.1V", | |
"0V8_SW": "Switch 0.8V", | |
"VDD_CORE": "Core", | |
"3V3_DAC": "DAC 3.3V", | |
"3V3_ADC": "ADC 3.3V", | |
"0V8_AON": "Always-On 0.8V", | |
"HDMI": "HDMI", | |
"EXT5V": "External 5V", | |
"BATT": "Battery", | |
} | |
# Valori nominali (in Volt) per ciascun sensore; se None, non è disponibile | |
nominals = { | |
"3V7_WL_SW": 3.7, | |
"3V3_SYS": 3.3, | |
"1V8_SYS": 1.8, | |
"DDR_VDD2": 1.1, | |
"DDR_VDDQ": 0.6, | |
"1V1_SYS": 1.1, | |
"0V8_SW": 0.8, | |
"VDD_CORE": 0.75, | |
"3V3_DAC": 3.3, | |
"3V3_ADC": 3.3, | |
"0V8_AON": 0.8, | |
"HDMI": 5.0, | |
"EXT5V": 5.0, | |
"BATT": None, | |
} | |
# History: per ciascun sensore, memorizza le letture (timestamp, volt, current) degli ultimi PEAK_PERIOD secondi. | |
history = {} | |
def get_pmic_data(): | |
""" | |
Esegue il comando "vcgencmd pmic_read_adc" e parsa l'output per estrarre i valori live di voltaggio e corrente. | |
Returns: | |
dict: Dizionario in cui ogni chiave è l'id del sensore e il valore è un dizionario con "volt" e "current". | |
""" | |
try: | |
output = subprocess.check_output(["vcgencmd", "pmic_read_adc"]).decode("utf-8") | |
except Exception as e: | |
print("Errore eseguendo vcgencmd:", e) | |
return {} | |
sensors = {} | |
for line in output.splitlines(): | |
line = line.strip() | |
if not line: | |
continue | |
# Esempio: "3V7_WL_SW_A current(0)=0.00000000A" oppure "3V7_WL_SW_V volt(8)=3.74153500V" | |
parts = line.split() | |
if len(parts) < 2: | |
continue | |
sensor_part = parts[0] | |
meas_part = parts[1] | |
if "=" in meas_part: | |
value_str = meas_part.split("=")[1] | |
if value_str and value_str[-1] in "AV": | |
value_str = value_str[:-1] | |
try: | |
value = float(value_str) | |
except Exception: | |
value = None | |
else: | |
value = None | |
parts2 = sensor_part.split('_') | |
if parts2[-1] in ("A", "V"): | |
meas_type = parts2[-1] | |
sensor_id = "_".join(parts2[:-1]) | |
else: | |
sensor_id = sensor_part | |
meas_type = "" | |
if sensor_id not in sensors: | |
sensors[sensor_id] = {"volt": None, "current": None} | |
if meas_type == "A": | |
sensors[sensor_id]["current"] = value | |
elif meas_type == "V": | |
sensors[sensor_id]["volt"] = value | |
return sensors | |
def update_history(sensors): | |
""" | |
Aggiorna il dizionario 'history' aggiungendo le letture correnti per ciascun sensore e rimuovendo quelle più vecchie di PEAK_PERIOD secondi. | |
Args: | |
sensors (dict): Letture live dei sensori. | |
""" | |
now = time.time() | |
for sensor_id, data in sensors.items(): | |
if sensor_id not in history: | |
history[sensor_id] = deque() | |
history[sensor_id].append((now, data["volt"], data["current"])) | |
while history[sensor_id] and now - history[sensor_id][0][0] > PEAK_PERIOD: | |
history[sensor_id].popleft() | |
def compute_history(sensor_id): | |
""" | |
Per il sensore specificato, calcola: | |
- Il valore minimo di voltaggio (Voltage min) osservato negli ultimi PEAK_PERIOD secondi. | |
- Il valore massimo di corrente (Max Current) osservato nello stesso periodo. | |
Args: | |
sensor_id (str): Identificativo del sensore. | |
Returns: | |
tuple: (min_voltage, max_current) come float o None. | |
""" | |
min_voltage = None | |
max_current = None | |
if sensor_id in history: | |
for ts, volt, current in history[sensor_id]: | |
if volt is not None: | |
if (min_voltage is None) or (volt < min_voltage): | |
min_voltage = volt | |
if current is not None: | |
if (max_current is None) or (current > max_current): | |
max_current = current | |
return min_voltage, max_current | |
def compute_max_power(sensor_id): | |
""" | |
Calcola il massimo valore istantaneo di potenza (volt * current) registrato nel periodo PEAK_PERIOD per il sensore. | |
Args: | |
sensor_id (str): Identificativo del sensore. | |
Returns: | |
float: Massima potenza registrata oppure None. | |
""" | |
max_power = None | |
if sensor_id in history: | |
for ts, volt, cur in history[sensor_id]: | |
if (volt is not None) and (cur is not None): | |
power = volt * cur | |
if (max_power is None) or (power > max_power): | |
max_power = power | |
return max_power | |
def compute_totals(sensors): | |
""" | |
Calcola i totali per i valori live e per il massimo istantaneo (max power) su tutti i sensori. | |
Per ogni sensore: | |
- Somma la corrente live. | |
- Somma la potenza live (volt_live * current_live) se disponibili. | |
- Somma la corrente massima (max_current) e la potenza massima (max_power) registrate nel periodo. | |
Returns: | |
tuple: (total_current, total_max_current, total_power_live, total_power_max) | |
""" | |
total_current = 0.0 | |
total_max_current = 0.0 | |
total_power_live = 0.0 | |
total_power_max = 0.0 | |
for sensor_id, data in sensors.items(): | |
cur = data.get("current") | |
volt = data.get("volt") | |
if cur is not None: | |
total_current += cur | |
if (volt is not None) and (cur is not None): | |
total_power_live += volt * cur | |
_, max_cur = compute_history(sensor_id) | |
if max_cur is not None: | |
total_max_current += max_cur | |
sensor_max_power = compute_max_power(sensor_id) | |
if sensor_max_power is not None: | |
total_power_max += sensor_max_power | |
return total_current, total_max_current, total_power_live, total_power_max | |
def compute_alert(sensor_id, voltage_live): | |
""" | |
Determina l'alert per un sensore basandosi sul rapporto tra il voltaggio live e il valore nominale. | |
Criteri: | |
- "OK" se il voltaggio live è >= 95% del nominale. | |
- "*" se è tra il 90% e il 95%. | |
- "**" se è inferiore al 90%. | |
- "N/A" se il nominale o il valore live non sono disponibili. | |
Args: | |
sensor_id (str): Identificativo del sensore. | |
voltage_live (float): Valore live del voltaggio. | |
Returns: | |
str: Stato di alert ("OK", "*", "**" o "N/A"). | |
""" | |
nominal = nominals.get(sensor_id) | |
if (nominal is None) or (voltage_live is None): | |
return "N/A" | |
ratio = voltage_live / nominal | |
if ratio >= 0.95: | |
return "OK" | |
elif ratio >= 0.90: | |
return "*" | |
else: | |
return "**" | |
def format_voltage(voltage, sensor_id): | |
""" | |
Formattta un valore di voltaggio, aggiungendo tra parentesi la percentuale rispetto al valore nominale (se disponibile). | |
Args: | |
voltage (float): Valore di voltaggio. | |
sensor_id (str): Identificativo del sensore. | |
Returns: | |
str: Valore formattato (es. "3.3210 (100.3%)") o "N/A" se il valore è None. | |
""" | |
if voltage is None: | |
return "N/A" | |
nominal = nominals.get(sensor_id) | |
if (nominal is not None) and (nominal > 0): | |
perc = voltage / nominal * 100 | |
return f"{voltage:.{PRECISION}f} ({perc:.1f}%)" | |
else: | |
return f"{voltage:.{PRECISION}f}" | |
def print_table(sensors): | |
""" | |
Stampa in console una tabella formattata con le misurazioni per ciascun sensore. | |
Le colonne sono: | |
- Sensor: Identificativo del sensore. | |
- Alert: Stato di alert basato sul confronto tra il voltaggio live e il nominale. | |
- Voltage (V) live: Valore live del voltaggio formattato con percentuale rispetto al nominale. | |
- Current (A) live: Valore live della corrente. | |
- Power (W) live: Prodotto di voltaggio_live e corrente_live. | |
- Voltage min (V): Il minimo valore di voltaggio registrato nel periodo. | |
- Max Current (A): Il massimo valore di corrente registrato nel periodo. | |
- Power max (W): Il massimo valore istantaneo di potenza (volt * current) registrato nel periodo. | |
- Description: Breve descrizione del sensore. | |
Le intestazioni sono stampate su due righe per ottenere colonne più compatte. | |
Viene stampata anche una riga "TOTAL" che somma i valori live (per corrente e potenza) e i valori massimi registrati, | |
mentre per le colonne di voltaggio si usa il valore live del rail EXT5V, se disponibile. | |
""" | |
# Definiamo le larghezze dei campi | |
fmt = "{:<12} {:>6} {:>16} {:>10} {:>10} {:>16} {:>10} {:>10} {:<20}" | |
header_line1 = fmt.format("Sensor", "Alert", "Voltage", "Current", "Power", "Voltage", "Current", "Power", "Description") | |
header_line2 = fmt.format("", "", "(V)", "(A)", "(W)", "min (V)", "max (A)", "max (W)", "") | |
print(header_line1) | |
print(header_line2) | |
print("-" * len(header_line1)) | |
# Riga dei totali: per Voltage live e Voltage min, se EXT5V è disponibile, usiamo i suoi valori. | |
if "EXT5V" in sensors and sensors["EXT5V"]["volt"] is not None: | |
total_voltage_live = format_voltage(sensors["EXT5V"]["volt"], "EXT5V") | |
min_v_ext, _ = compute_history("EXT5V") | |
total_voltage_min = format_voltage(min_v_ext, "EXT5V") | |
else: | |
total_voltage_live = "N/A" | |
total_voltage_min = "N/A" | |
total_current, total_max_current, total_power_live, total_power_max = compute_totals(sensors) | |
total_row = fmt.format( | |
"TOTAL", "N/A", total_voltage_live, | |
f"{total_current:.{PRECISION}f}", | |
f"{total_power_live:.{PRECISION}f}", | |
total_voltage_min, | |
f"{total_max_current:.{PRECISION}f}", | |
f"{total_power_max:.{PRECISION}f}", | |
"Total consumption" | |
) | |
print(total_row) | |
print("-" * len(header_line1)) | |
# Righe per ciascun sensore | |
for sensor_id, data in sorted(sensors.items()): | |
voltage_live = data["volt"] | |
current_live = data["current"] | |
voltage_str = format_voltage(voltage_live, sensor_id) | |
current_str = f"{current_live:.{PRECISION}f}" if current_live is not None else "N/A" | |
power_live = voltage_live * current_live if (voltage_live is not None and current_live is not None) else None | |
power_live_str = f"{power_live:.{PRECISION}f}" if power_live is not None else "N/A" | |
# Valori storici: Voltage min e Max Current | |
min_voltage, max_current = compute_history(sensor_id) | |
min_voltage_str = format_voltage(min_voltage, sensor_id) if min_voltage is not None else "N/A" | |
max_current_str = f"{max_current:.{PRECISION}f}" if max_current is not None else "N/A" | |
# Calcola il max power come il massimo istantaneo di power (volt * current) nel periodo | |
max_power = compute_max_power(sensor_id) | |
max_power_str = f"{max_power:.{PRECISION}f}" if max_power is not None else "N/A" | |
alert = compute_alert(sensor_id, voltage_live) | |
desc = descriptions.get(sensor_id, sensor_id) | |
print(fmt.format( | |
sensor_id, alert, voltage_str, current_str, power_live_str, | |
min_voltage_str, max_current_str, max_power_str, desc | |
)) | |
def main(): | |
""" | |
Funzione principale: esegue in loop il monitoraggio. | |
In ogni iterazione: | |
- Legge i dati live tramite get_pmic_data(). | |
- Aggiorna il buffer 'history' con le letture correnti. | |
- Pulisce lo schermo e stampa la tabella formattata (intestazioni su due righe). | |
- Attende REFRESH_INTERVAL secondi. | |
""" | |
while True: | |
sensors = get_pmic_data() | |
update_history(sensors) | |
os.system('clear' if os.name == 'posix' else 'cls') | |
print_table(sensors) | |
time.sleep(REFRESH_INTERVAL) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment