Skip to content

Instantly share code, notes, and snippets.

@CorradoLanera
Created March 10, 2025 20:14
Show Gist options
  • Save CorradoLanera/51525b45f980593d08f140a376587acb to your computer and use it in GitHub Desktop.
Save CorradoLanera/51525b45f980593d08f140a376587acb to your computer and use it in GitHub Desktop.
Monitor tensione e corrente con picchi e allert criticità
#!/usr/bin/env python3
"""
Monitor continuo di vcgencmd pmic_read_adc con aggiornamento, calcolo dei valori live, min/max, totali, potenza e alert.
Questo script esegue periodicamente il comando "vcgencmd pmic_read_adc" per estrarre le letture di voltaggio e corrente
per ciascun rail (sensore) e, per ogni sensore, calcola:
- I valori live di voltaggio, corrente e potenza (calcolata come voltaggio_live * corrente_live).
- Il valore minimo di voltaggio (Voltage min) e il valore massimo di corrente (Max Current) registrati negli ultimi PEAK_PERIOD secondi,
utilizzati per calcolare il "Power max" come il massimo istantaneo (volt * current) registrato nel periodo.
- Un alert basato sul confronto tra il voltaggio live e il valore nominale.
- I totali (somma) per la corrente e la potenza live e max; per le colonne di voltaggio totali, se disponibile, viene usato il valore live di EXT5V.
Le intestazioni vengono stampate su due righe per ottenere colonne compatte.
I parametri (intervallo di aggiornamento, periodo per il calcolo dei valori min/max e precisione dei decimali)
possono essere passati tramite riga di comando.
Esempio d'uso:
python3 monitor_pmic_peak.py --refresh 3 --peak 120 --precision 5
"""
import argparse
import subprocess
import time
import os
from collections import deque
# Parametri default
DEFAULT_REFRESH_INTERVAL = 2 # intervallo (in secondi) tra aggiornamenti
DEFAULT_PEAK_PERIOD = 60 # periodo (in secondi) per il calcolo dei valori min/max
DEFAULT_PRECISION = 4 # numero di cifre decimali da mostrare
# Parsing degli argomenti da linea di comando
parser = argparse.ArgumentParser(
description="Monitor continuo di vcgencmd pmic_read_adc con aggiornamento, calcolo dei min/max, totali e potenza."
)
parser.add_argument("-r", "--refresh", type=float, default=DEFAULT_REFRESH_INTERVAL,
help="Intervallo di aggiornamento in secondi (default: %(default)s)")
parser.add_argument("-p", "--peak", type=float, default=DEFAULT_PEAK_PERIOD,
help="Periodo in secondi per il calcolo dei valori min/max (default: %(default)s)")
parser.add_argument("-c", "--precision", type=int, default=DEFAULT_PRECISION,
help="Numero di cifre decimali da mostrare (default: %(default)s)")
args = parser.parse_args()
REFRESH_INTERVAL = args.refresh
PEAK_PERIOD = args.peak
PRECISION = args.precision
# Mappa dei sensori: chiave = sensor id (senza suffisso _A/_V), valore = descrizione (senza "rail")
descriptions = {
"3V7_WL_SW": "Wireless Switch",
"3V3_SYS": "System 3.3V",
"1V8_SYS": "System 1.8V",
"DDR_VDD2": "DDR VDD2",
"DDR_VDDQ": "DDR VDDQ",
"1V1_SYS": "System 1.1V",
"0V8_SW": "Switch 0.8V",
"VDD_CORE": "Core",
"3V3_DAC": "DAC 3.3V",
"3V3_ADC": "ADC 3.3V",
"0V8_AON": "Always-On 0.8V",
"HDMI": "HDMI",
"EXT5V": "External 5V",
"BATT": "Battery",
}
# Valori nominali (in Volt) per ciascun sensore; se None, non è disponibile
nominals = {
"3V7_WL_SW": 3.7,
"3V3_SYS": 3.3,
"1V8_SYS": 1.8,
"DDR_VDD2": 1.1,
"DDR_VDDQ": 0.6,
"1V1_SYS": 1.1,
"0V8_SW": 0.8,
"VDD_CORE": 0.75,
"3V3_DAC": 3.3,
"3V3_ADC": 3.3,
"0V8_AON": 0.8,
"HDMI": 5.0,
"EXT5V": 5.0,
"BATT": None,
}
# History: per ciascun sensore, memorizza le letture (timestamp, volt, current) degli ultimi PEAK_PERIOD secondi.
history = {}
def get_pmic_data():
"""
Esegue il comando "vcgencmd pmic_read_adc" e parsa l'output per estrarre i valori live di voltaggio e corrente.
Returns:
dict: Dizionario in cui ogni chiave è l'id del sensore e il valore è un dizionario con "volt" e "current".
"""
try:
output = subprocess.check_output(["vcgencmd", "pmic_read_adc"]).decode("utf-8")
except Exception as e:
print("Errore eseguendo vcgencmd:", e)
return {}
sensors = {}
for line in output.splitlines():
line = line.strip()
if not line:
continue
# Esempio: "3V7_WL_SW_A current(0)=0.00000000A" oppure "3V7_WL_SW_V volt(8)=3.74153500V"
parts = line.split()
if len(parts) < 2:
continue
sensor_part = parts[0]
meas_part = parts[1]
if "=" in meas_part:
value_str = meas_part.split("=")[1]
if value_str and value_str[-1] in "AV":
value_str = value_str[:-1]
try:
value = float(value_str)
except Exception:
value = None
else:
value = None
parts2 = sensor_part.split('_')
if parts2[-1] in ("A", "V"):
meas_type = parts2[-1]
sensor_id = "_".join(parts2[:-1])
else:
sensor_id = sensor_part
meas_type = ""
if sensor_id not in sensors:
sensors[sensor_id] = {"volt": None, "current": None}
if meas_type == "A":
sensors[sensor_id]["current"] = value
elif meas_type == "V":
sensors[sensor_id]["volt"] = value
return sensors
def update_history(sensors):
"""
Aggiorna il dizionario 'history' aggiungendo le letture correnti per ciascun sensore e rimuovendo quelle più vecchie di PEAK_PERIOD secondi.
Args:
sensors (dict): Letture live dei sensori.
"""
now = time.time()
for sensor_id, data in sensors.items():
if sensor_id not in history:
history[sensor_id] = deque()
history[sensor_id].append((now, data["volt"], data["current"]))
while history[sensor_id] and now - history[sensor_id][0][0] > PEAK_PERIOD:
history[sensor_id].popleft()
def compute_history(sensor_id):
"""
Per il sensore specificato, calcola:
- Il valore minimo di voltaggio (Voltage min) osservato negli ultimi PEAK_PERIOD secondi.
- Il valore massimo di corrente (Max Current) osservato nello stesso periodo.
Args:
sensor_id (str): Identificativo del sensore.
Returns:
tuple: (min_voltage, max_current) come float o None.
"""
min_voltage = None
max_current = None
if sensor_id in history:
for ts, volt, current in history[sensor_id]:
if volt is not None:
if (min_voltage is None) or (volt < min_voltage):
min_voltage = volt
if current is not None:
if (max_current is None) or (current > max_current):
max_current = current
return min_voltage, max_current
def compute_max_power(sensor_id):
"""
Calcola il massimo valore istantaneo di potenza (volt * current) registrato nel periodo PEAK_PERIOD per il sensore.
Args:
sensor_id (str): Identificativo del sensore.
Returns:
float: Massima potenza registrata oppure None.
"""
max_power = None
if sensor_id in history:
for ts, volt, cur in history[sensor_id]:
if (volt is not None) and (cur is not None):
power = volt * cur
if (max_power is None) or (power > max_power):
max_power = power
return max_power
def compute_totals(sensors):
"""
Calcola i totali per i valori live e per il massimo istantaneo (max power) su tutti i sensori.
Per ogni sensore:
- Somma la corrente live.
- Somma la potenza live (volt_live * current_live) se disponibili.
- Somma la corrente massima (max_current) e la potenza massima (max_power) registrate nel periodo.
Returns:
tuple: (total_current, total_max_current, total_power_live, total_power_max)
"""
total_current = 0.0
total_max_current = 0.0
total_power_live = 0.0
total_power_max = 0.0
for sensor_id, data in sensors.items():
cur = data.get("current")
volt = data.get("volt")
if cur is not None:
total_current += cur
if (volt is not None) and (cur is not None):
total_power_live += volt * cur
_, max_cur = compute_history(sensor_id)
if max_cur is not None:
total_max_current += max_cur
sensor_max_power = compute_max_power(sensor_id)
if sensor_max_power is not None:
total_power_max += sensor_max_power
return total_current, total_max_current, total_power_live, total_power_max
def compute_alert(sensor_id, voltage_live):
"""
Determina l'alert per un sensore basandosi sul rapporto tra il voltaggio live e il valore nominale.
Criteri:
- "OK" se il voltaggio live è >= 95% del nominale.
- "*" se è tra il 90% e il 95%.
- "**" se è inferiore al 90%.
- "N/A" se il nominale o il valore live non sono disponibili.
Args:
sensor_id (str): Identificativo del sensore.
voltage_live (float): Valore live del voltaggio.
Returns:
str: Stato di alert ("OK", "*", "**" o "N/A").
"""
nominal = nominals.get(sensor_id)
if (nominal is None) or (voltage_live is None):
return "N/A"
ratio = voltage_live / nominal
if ratio >= 0.95:
return "OK"
elif ratio >= 0.90:
return "*"
else:
return "**"
def format_voltage(voltage, sensor_id):
"""
Formattta un valore di voltaggio, aggiungendo tra parentesi la percentuale rispetto al valore nominale (se disponibile).
Args:
voltage (float): Valore di voltaggio.
sensor_id (str): Identificativo del sensore.
Returns:
str: Valore formattato (es. "3.3210 (100.3%)") o "N/A" se il valore è None.
"""
if voltage is None:
return "N/A"
nominal = nominals.get(sensor_id)
if (nominal is not None) and (nominal > 0):
perc = voltage / nominal * 100
return f"{voltage:.{PRECISION}f} ({perc:.1f}%)"
else:
return f"{voltage:.{PRECISION}f}"
def print_table(sensors):
"""
Stampa in console una tabella formattata con le misurazioni per ciascun sensore.
Le colonne sono:
- Sensor: Identificativo del sensore.
- Alert: Stato di alert basato sul confronto tra il voltaggio live e il nominale.
- Voltage (V) live: Valore live del voltaggio formattato con percentuale rispetto al nominale.
- Current (A) live: Valore live della corrente.
- Power (W) live: Prodotto di voltaggio_live e corrente_live.
- Voltage min (V): Il minimo valore di voltaggio registrato nel periodo.
- Max Current (A): Il massimo valore di corrente registrato nel periodo.
- Power max (W): Il massimo valore istantaneo di potenza (volt * current) registrato nel periodo.
- Description: Breve descrizione del sensore.
Le intestazioni sono stampate su due righe per ottenere colonne più compatte.
Viene stampata anche una riga "TOTAL" che somma i valori live (per corrente e potenza) e i valori massimi registrati,
mentre per le colonne di voltaggio si usa il valore live del rail EXT5V, se disponibile.
"""
# Definiamo le larghezze dei campi
fmt = "{:<12} {:>6} {:>16} {:>10} {:>10} {:>16} {:>10} {:>10} {:<20}"
header_line1 = fmt.format("Sensor", "Alert", "Voltage", "Current", "Power", "Voltage", "Current", "Power", "Description")
header_line2 = fmt.format("", "", "(V)", "(A)", "(W)", "min (V)", "max (A)", "max (W)", "")
print(header_line1)
print(header_line2)
print("-" * len(header_line1))
# Riga dei totali: per Voltage live e Voltage min, se EXT5V è disponibile, usiamo i suoi valori.
if "EXT5V" in sensors and sensors["EXT5V"]["volt"] is not None:
total_voltage_live = format_voltage(sensors["EXT5V"]["volt"], "EXT5V")
min_v_ext, _ = compute_history("EXT5V")
total_voltage_min = format_voltage(min_v_ext, "EXT5V")
else:
total_voltage_live = "N/A"
total_voltage_min = "N/A"
total_current, total_max_current, total_power_live, total_power_max = compute_totals(sensors)
total_row = fmt.format(
"TOTAL", "N/A", total_voltage_live,
f"{total_current:.{PRECISION}f}",
f"{total_power_live:.{PRECISION}f}",
total_voltage_min,
f"{total_max_current:.{PRECISION}f}",
f"{total_power_max:.{PRECISION}f}",
"Total consumption"
)
print(total_row)
print("-" * len(header_line1))
# Righe per ciascun sensore
for sensor_id, data in sorted(sensors.items()):
voltage_live = data["volt"]
current_live = data["current"]
voltage_str = format_voltage(voltage_live, sensor_id)
current_str = f"{current_live:.{PRECISION}f}" if current_live is not None else "N/A"
power_live = voltage_live * current_live if (voltage_live is not None and current_live is not None) else None
power_live_str = f"{power_live:.{PRECISION}f}" if power_live is not None else "N/A"
# Valori storici: Voltage min e Max Current
min_voltage, max_current = compute_history(sensor_id)
min_voltage_str = format_voltage(min_voltage, sensor_id) if min_voltage is not None else "N/A"
max_current_str = f"{max_current:.{PRECISION}f}" if max_current is not None else "N/A"
# Calcola il max power come il massimo istantaneo di power (volt * current) nel periodo
max_power = compute_max_power(sensor_id)
max_power_str = f"{max_power:.{PRECISION}f}" if max_power is not None else "N/A"
alert = compute_alert(sensor_id, voltage_live)
desc = descriptions.get(sensor_id, sensor_id)
print(fmt.format(
sensor_id, alert, voltage_str, current_str, power_live_str,
min_voltage_str, max_current_str, max_power_str, desc
))
def main():
"""
Funzione principale: esegue in loop il monitoraggio.
In ogni iterazione:
- Legge i dati live tramite get_pmic_data().
- Aggiorna il buffer 'history' con le letture correnti.
- Pulisce lo schermo e stampa la tabella formattata (intestazioni su due righe).
- Attende REFRESH_INTERVAL secondi.
"""
while True:
sensors = get_pmic_data()
update_history(sensors)
os.system('clear' if os.name == 'posix' else 'cls')
print_table(sensors)
time.sleep(REFRESH_INTERVAL)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment