Skip to content

Instantly share code, notes, and snippets.

@MarkRobertJohnson
Last active October 17, 2025 05:09
Show Gist options
  • Select an option

  • Save MarkRobertJohnson/8b4bba95dd45a7f04b096df9727ccf8a to your computer and use it in GitHub Desktop.

Select an option

Save MarkRobertJohnson/8b4bba95dd45a7f04b096df9727ccf8a to your computer and use it in GitHub Desktop.
CPU Usage History Grapher for a Selected Process
#!/usr/bin/env python3
"""
CPU Usage History Grapher for a Selected Process
Features:
- Select process by PID or exact name (case-insensitive).
- Progress indicator with elapsed time, remaining time (ETA), and % complete.
- Warm-up to avoid inflated first sample.
- Optional normalization to 0–100% of total system CPU (across all logical CPUs).
- Optional EMA (Exponential Moving Average) smoothing to tame spikes (applies to the plotted/summary values).
- Computes Average, Min, and Max CPU usage over the monitoring period.
- Saves a PNG chart, CSV data, and prints a textual summary.
- Can load sample data from CSV instead of monitoring.
Usage examples:
python app.py 1234 --duration 45 --interval 0.5 --ema 5
python app.py myservice.exe --no-normalize
python app.py --csv-in data.csv --out graph.png --ema 10
"""
import argparse
import math
import time
import sys
from typing import Optional, Tuple, List, Dict
import psutil
import matplotlib.pyplot as plt
from datetime import datetime
import csv
# ------------------------- Process Resolution ------------------------- #
def resolve_pid(name_or_pid: str) -> Optional[int]:
"""
Return PID from a numeric input or the first process matching an exact name (case-insensitive).
"""
if name_or_pid.isdigit():
return int(name_or_pid)
target = name_or_pid.lower()
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] and proc.info['name'].lower() == target:
return proc.info['pid']
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return None
# ------------------------- Progress Indicator ------------------------- #
def format_hms(seconds: float) -> str:
seconds = max(0, int(round(seconds)))
h, rem = divmod(seconds, 3600)
m, s = divmod(rem, 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def print_progress(elapsed: float, total: float, bar_width: int = 36):
"""
Render a single-line progress bar with elapsed, remaining (ETA), and % complete.
"""
frac = 0.0 if total <= 0 else min(1.0, max(0.0, elapsed / total))
done = int(frac * bar_width)
remaining = max(0.0, total - elapsed)
bar = "█" * done + "─" * (bar_width - done)
pct = int(round(frac * 100))
msg = (
f"\r[{bar}] {pct:3d}% "
f"elapsed {format_hms(elapsed)} / {format_hms(total)} "
f"ETA {format_hms(remaining)}"
)
sys.stdout.write(msg)
sys.stdout.flush()
# ------------------------- Monitoring Logic ------------------------- #
def monitor_cpu_usage(
pid: int,
duration: float = 60.0,
interval: float = 1.0,
normalize_total_100: bool = True,
ema_window: int = 0,
show_progress: bool = True
) -> Tuple[List[float], List[float], Dict]:
"""
Monitor per-process CPU over time.
- normalize_total_100=True: divide per-process CPU by logical CPU count so the scale is 0–100 (total system CPU).
If False, values may exceed 100 on multicore systems (sum across cores).
- ema_window>1: apply exponential moving average to values (affects plotted values AND summary stats).
- show_progress: shows progress bar with elapsed/ETA.
Returns: (timestamps, values, meta)
"""
ncpu = psutil.cpu_count(logical=True) or 1
try:
proc = psutil.Process(pid)
pname = proc.name()
except psutil.NoSuchProcess:
raise SystemExit(f"Process not found (PID {pid})")
# Warm-up: the first call establishes baseline so subsequent % is meaningful
proc.cpu_percent(None)
total_samples = max(1, math.ceil(duration / max(interval, 1e-6)))
timestamps: List[float] = []
values: List[float] = []
start = time.time()
alpha = 2 / (ema_window + 1) if ema_window and ema_window > 1 else None
ema: Optional[float] = None
if show_progress:
print_progress(0.0, duration)
for _ in range(total_samples):
try:
# Blocking sample for approximately `interval` seconds
raw = proc.cpu_percent(interval=max(0.0, interval))
except psutil.NoSuchProcess:
# Process ended during collection; stop early
break
# Normalize to 0–100 if requested
val = raw / ncpu if normalize_total_100 else raw
# Clamp normalized values to [0, 100]
if normalize_total_100:
val = max(0.0, min(100.0, val))
# EMA smoothing (applies to plotted & summary values)
if alpha:
ema = val if ema is None else (alpha * val + (1 - alpha) * ema)
plot_val = ema
else:
plot_val = val
now = time.time()
elapsed = now - start
timestamps.append(min(elapsed, duration))
values.append(plot_val)
if show_progress:
print_progress(elapsed, duration)
if elapsed >= duration:
break
if show_progress:
sys.stdout.write("\n")
sys.stdout.flush()
# Stats over the plotted signal (post-normalization and smoothing, if enabled)
if values:
avg_cpu = sum(values) / len(values)
min_cpu = min(values)
max_cpu = max(values)
dur_actual = timestamps[-1]
else:
avg_cpu = min_cpu = max_cpu = 0.0
dur_actual = 0.0
meta = {
"pid": pid,
"process_name": pname,
"duration_actual": dur_actual,
"samples": len(timestamps),
"normalized_0_100": normalize_total_100,
"interval": interval,
"average_cpu": avg_cpu,
"min_cpu": min_cpu,
"max_cpu": max_cpu,
"ema_window": ema_window,
}
return timestamps, values, meta
# ------------------------- CSV Handling ------------------------- #
def load_from_csv(filepath: str, ema_window: int = 0) -> Tuple[List[float], List[float], Dict]:
"""
Load timestamps and CPU values from CSV, infer interval and duration.
Optionally apply EMA smoothing.
"""
timestamps = []
values = []
try:
with open(filepath, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
timestamps.append(float(row['Time']))
values.append(float(row['CPU_Usage']))
except FileNotFoundError:
raise SystemExit(f"CSV file not found: {filepath}")
except KeyError:
raise SystemExit("CSV must have 'Time' and 'CPU_Usage' columns")
if not timestamps:
return [], [], {}
# Infer interval
if len(timestamps) > 1:
diffs = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
interval = sum(diffs) / len(diffs)
else:
interval = 1.0
duration = max(timestamps)
# Apply EMA if requested
if ema_window > 1:
alpha = 2 / (ema_window + 1)
ema = None
smoothed = []
for val in values:
ema = val if ema is None else (alpha * val + (1 - alpha) * ema)
smoothed.append(ema)
values = smoothed
avg_cpu = sum(values) / len(values)
min_cpu = min(values)
max_cpu = max(values)
meta = {
"pid": 0,
"process_name": "Loaded from CSV",
"duration_actual": duration,
"samples": len(timestamps),
"normalized_0_100": True, # Assume normalized
"interval": interval,
"average_cpu": avg_cpu,
"min_cpu": min_cpu,
"max_cpu": max_cpu,
"ema_window": ema_window if ema_window > 1 else 0,
}
return timestamps, values, meta
def write_to_csv(timestamps: List[float], values: List[float], filepath: str):
"""
Write timestamps and CPU values to CSV.
"""
with open(filepath, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Time', 'CPU_Usage'])
for t, v in zip(timestamps, values):
writer.writerow([f"{t:.4f}", f"{v:.2f}"])
# ------------------------- Plotting ------------------------- #
def plot_cpu_usage(
timestamps: List[float],
values: List[float],
meta: Dict,
outfile: str = "cpu_usage_history.png"
):
title_mode = "Total 0–100%" if meta.get("normalized_0_100", True) else "Per-core sum"
ema_part = f" with EMA {meta['ema_window']}" if meta.get('ema_window', 0) > 1 else ""
title = f"CPU Usage: PID {meta['pid']} ({meta.get('process_name','')}) — {title_mode}{ema_part}"
plt.figure(figsize=(10, 6))
plt.plot(timestamps, values, marker='o', markersize=0, linewidth=1.5, label="CPU Usage")
# Average line
avg = meta.get("average_cpu", 0.0)
plt.axhline(y=avg, color='r', linestyle='--', linewidth=1.2, label=f"Avg: {avg:.2f}%%")
plt.title(title)
plt.xlabel("Time (s)")
ylabel = "CPU Usage (%%)" if meta.get("normalized_0_100", True) else "CPU Usage (%% of one core)"
plt.ylabel(ylabel)
if meta.get("normalized_0_100", True):
plt.ylim(0, 100)
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
plt.savefig(outfile)
plt.show()
print(f"Saved graph to: {outfile}")
# ------------------------- CLI Entry ------------------------- #
def main():
parser = argparse.ArgumentParser(description="Plot CPU usage history for a process.")
parser.add_argument("target", nargs='?', help="Process PID or exact name (e.g., 'python.exe' or 'python'). Required unless --csv-in is used.")
parser.add_argument("--duration", type=float, default=60.0, help="Seconds to monitor.")
parser.add_argument("--interval", type=float, default=1.0, help="Sampling interval (seconds).")
parser.add_argument("--no-normalize", action="store_true",
help="Do not normalize by logical CPU count (values may exceed 100%%).")
parser.add_argument("--ema", type=int, default=0,
help="EMA (Exponential Moving Average) window length (>1 to enable smoothing of CPU values to reduce spikes).")
parser.add_argument("--out", default="cpu_usage_history.png", help="Output PNG path.")
parser.add_argument("--no-progress", action="store_true", help="Disable progress indicator.")
parser.add_argument("--csv-in", help="Input CSV path to load sample data instead of monitoring.")
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
args = parser.parse_args()
if args.csv_in:
# Load from CSV
ts, vals, meta = load_from_csv(args.csv_in, args.ema)
if not ts:
raise SystemExit("No data loaded from CSV.")
# If target PID provided, get process name for title/filename
if args.target:
pid = resolve_pid(args.target)
if pid:
try:
proc = psutil.Process(pid)
meta['process_name'] = proc.name()
meta['pid'] = pid
except psutil.NoSuchProcess:
pass
# Set duration and interval for filename generation
args.duration = meta['duration_actual']
args.interval = meta['interval']
else:
# Monitor process
if not args.target:
raise SystemExit("Must provide target process or --csv-in")
pid = resolve_pid(args.target)
if pid is None:
raise SystemExit(f"Process not found: {args.target}")
ts, vals, meta = monitor_cpu_usage(
pid=pid,
duration=args.duration,
interval=args.interval,
normalize_total_100=not args.no_normalize,
ema_window=args.ema,
show_progress=not args.no_progress
)
if not ts:
raise SystemExit("No samples collected (process may have exited).")
# Generate descriptive filename if default
if args.out == "cpu_usage_history.png":
dt_str = datetime.now().strftime("%Y%m%d_%H%M%S")
pname = meta['process_name'].replace(' ', '_').replace('/', '_').replace('\\', '_')
pid = meta['pid']
dur = args.duration
interv = args.interval
ema_part = f"_ema{meta['ema_window']}" if meta.get('ema_window', 0) > 1 else ""
args.out = f"{pname}_{pid}_{dur:.1f}s_{interv:.4f}s{ema_part}_{dt_str}.png"
if not args.csv_in:
# Write CSV with same base name as PNG
csv_out = args.out.rsplit('.', 1)[0] + '.csv'
write_to_csv(ts, vals, csv_out)
print(f"Saved sample data to: {csv_out}")
plot_cpu_usage(ts, vals, meta, outfile=args.out)
# Console summary
ema_str = f" with EMA {meta['ema_window']}" if (meta.get('ema_window', 0) and meta['ema_window'] > 1) else ''
print(
f"Summary over {meta['duration_actual']:.1f}s "
f"({'normalized 0–100%%' if meta['normalized_0_100'] else 'per-core sum'}){ema_str}:"
)
print(f" Samples : {meta['samples']}")
print(f" Average : {meta['average_cpu']:.2f}%%")
print(f" Min : {meta['min_cpu']:.2f}%%")
print(f" Max : {meta['max_cpu']:.2f}%%")
print(f" Interval : {meta['interval']}s")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment