Last active
October 17, 2025 05:09
-
-
Save MarkRobertJohnson/8b4bba95dd45a7f04b096df9727ccf8a to your computer and use it in GitHub Desktop.
CPU Usage History Grapher for a Selected Process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| CPU Usage History Grapher for a Selected Process | |
| Features: | |
| - Select process by PID or exact name (case-insensitive). | |
| - Progress indicator with elapsed time, remaining time (ETA), and % complete. | |
| - Warm-up to avoid inflated first sample. | |
| - Optional normalization to 0–100% of total system CPU (across all logical CPUs). | |
| - Optional EMA (Exponential Moving Average) smoothing to tame spikes (applies to the plotted/summary values). | |
| - Computes Average, Min, and Max CPU usage over the monitoring period. | |
| - Saves a PNG chart, CSV data, and prints a textual summary. | |
| - Can load sample data from CSV instead of monitoring. | |
| Usage examples: | |
| python app.py 1234 --duration 45 --interval 0.5 --ema 5 | |
| python app.py myservice.exe --no-normalize | |
| python app.py --csv-in data.csv --out graph.png --ema 10 | |
| """ | |
| import argparse | |
| import math | |
| import time | |
| import sys | |
| from typing import Optional, Tuple, List, Dict | |
| import psutil | |
| import matplotlib.pyplot as plt | |
| from datetime import datetime | |
| import csv | |
| # ------------------------- Process Resolution ------------------------- # | |
| def resolve_pid(name_or_pid: str) -> Optional[int]: | |
| """ | |
| Return PID from a numeric input or the first process matching an exact name (case-insensitive). | |
| """ | |
| if name_or_pid.isdigit(): | |
| return int(name_or_pid) | |
| target = name_or_pid.lower() | |
| for proc in psutil.process_iter(['pid', 'name']): | |
| try: | |
| if proc.info['name'] and proc.info['name'].lower() == target: | |
| return proc.info['pid'] | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| continue | |
| return None | |
| # ------------------------- Progress Indicator ------------------------- # | |
| def format_hms(seconds: float) -> str: | |
| seconds = max(0, int(round(seconds))) | |
| h, rem = divmod(seconds, 3600) | |
| m, s = divmod(rem, 60) | |
| return f"{h:02d}:{m:02d}:{s:02d}" | |
| def print_progress(elapsed: float, total: float, bar_width: int = 36): | |
| """ | |
| Render a single-line progress bar with elapsed, remaining (ETA), and % complete. | |
| """ | |
| frac = 0.0 if total <= 0 else min(1.0, max(0.0, elapsed / total)) | |
| done = int(frac * bar_width) | |
| remaining = max(0.0, total - elapsed) | |
| bar = "█" * done + "─" * (bar_width - done) | |
| pct = int(round(frac * 100)) | |
| msg = ( | |
| f"\r[{bar}] {pct:3d}% " | |
| f"elapsed {format_hms(elapsed)} / {format_hms(total)} " | |
| f"ETA {format_hms(remaining)}" | |
| ) | |
| sys.stdout.write(msg) | |
| sys.stdout.flush() | |
| # ------------------------- Monitoring Logic ------------------------- # | |
| def monitor_cpu_usage( | |
| pid: int, | |
| duration: float = 60.0, | |
| interval: float = 1.0, | |
| normalize_total_100: bool = True, | |
| ema_window: int = 0, | |
| show_progress: bool = True | |
| ) -> Tuple[List[float], List[float], Dict]: | |
| """ | |
| Monitor per-process CPU over time. | |
| - normalize_total_100=True: divide per-process CPU by logical CPU count so the scale is 0–100 (total system CPU). | |
| If False, values may exceed 100 on multicore systems (sum across cores). | |
| - ema_window>1: apply exponential moving average to values (affects plotted values AND summary stats). | |
| - show_progress: shows progress bar with elapsed/ETA. | |
| Returns: (timestamps, values, meta) | |
| """ | |
| ncpu = psutil.cpu_count(logical=True) or 1 | |
| try: | |
| proc = psutil.Process(pid) | |
| pname = proc.name() | |
| except psutil.NoSuchProcess: | |
| raise SystemExit(f"Process not found (PID {pid})") | |
| # Warm-up: the first call establishes baseline so subsequent % is meaningful | |
| proc.cpu_percent(None) | |
| total_samples = max(1, math.ceil(duration / max(interval, 1e-6))) | |
| timestamps: List[float] = [] | |
| values: List[float] = [] | |
| start = time.time() | |
| alpha = 2 / (ema_window + 1) if ema_window and ema_window > 1 else None | |
| ema: Optional[float] = None | |
| if show_progress: | |
| print_progress(0.0, duration) | |
| for _ in range(total_samples): | |
| try: | |
| # Blocking sample for approximately `interval` seconds | |
| raw = proc.cpu_percent(interval=max(0.0, interval)) | |
| except psutil.NoSuchProcess: | |
| # Process ended during collection; stop early | |
| break | |
| # Normalize to 0–100 if requested | |
| val = raw / ncpu if normalize_total_100 else raw | |
| # Clamp normalized values to [0, 100] | |
| if normalize_total_100: | |
| val = max(0.0, min(100.0, val)) | |
| # EMA smoothing (applies to plotted & summary values) | |
| if alpha: | |
| ema = val if ema is None else (alpha * val + (1 - alpha) * ema) | |
| plot_val = ema | |
| else: | |
| plot_val = val | |
| now = time.time() | |
| elapsed = now - start | |
| timestamps.append(min(elapsed, duration)) | |
| values.append(plot_val) | |
| if show_progress: | |
| print_progress(elapsed, duration) | |
| if elapsed >= duration: | |
| break | |
| if show_progress: | |
| sys.stdout.write("\n") | |
| sys.stdout.flush() | |
| # Stats over the plotted signal (post-normalization and smoothing, if enabled) | |
| if values: | |
| avg_cpu = sum(values) / len(values) | |
| min_cpu = min(values) | |
| max_cpu = max(values) | |
| dur_actual = timestamps[-1] | |
| else: | |
| avg_cpu = min_cpu = max_cpu = 0.0 | |
| dur_actual = 0.0 | |
| meta = { | |
| "pid": pid, | |
| "process_name": pname, | |
| "duration_actual": dur_actual, | |
| "samples": len(timestamps), | |
| "normalized_0_100": normalize_total_100, | |
| "interval": interval, | |
| "average_cpu": avg_cpu, | |
| "min_cpu": min_cpu, | |
| "max_cpu": max_cpu, | |
| "ema_window": ema_window, | |
| } | |
| return timestamps, values, meta | |
| # ------------------------- CSV Handling ------------------------- # | |
| def load_from_csv(filepath: str, ema_window: int = 0) -> Tuple[List[float], List[float], Dict]: | |
| """ | |
| Load timestamps and CPU values from CSV, infer interval and duration. | |
| Optionally apply EMA smoothing. | |
| """ | |
| timestamps = [] | |
| values = [] | |
| try: | |
| with open(filepath, 'r') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| timestamps.append(float(row['Time'])) | |
| values.append(float(row['CPU_Usage'])) | |
| except FileNotFoundError: | |
| raise SystemExit(f"CSV file not found: {filepath}") | |
| except KeyError: | |
| raise SystemExit("CSV must have 'Time' and 'CPU_Usage' columns") | |
| if not timestamps: | |
| return [], [], {} | |
| # Infer interval | |
| if len(timestamps) > 1: | |
| diffs = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] | |
| interval = sum(diffs) / len(diffs) | |
| else: | |
| interval = 1.0 | |
| duration = max(timestamps) | |
| # Apply EMA if requested | |
| if ema_window > 1: | |
| alpha = 2 / (ema_window + 1) | |
| ema = None | |
| smoothed = [] | |
| for val in values: | |
| ema = val if ema is None else (alpha * val + (1 - alpha) * ema) | |
| smoothed.append(ema) | |
| values = smoothed | |
| avg_cpu = sum(values) / len(values) | |
| min_cpu = min(values) | |
| max_cpu = max(values) | |
| meta = { | |
| "pid": 0, | |
| "process_name": "Loaded from CSV", | |
| "duration_actual": duration, | |
| "samples": len(timestamps), | |
| "normalized_0_100": True, # Assume normalized | |
| "interval": interval, | |
| "average_cpu": avg_cpu, | |
| "min_cpu": min_cpu, | |
| "max_cpu": max_cpu, | |
| "ema_window": ema_window if ema_window > 1 else 0, | |
| } | |
| return timestamps, values, meta | |
| def write_to_csv(timestamps: List[float], values: List[float], filepath: str): | |
| """ | |
| Write timestamps and CPU values to CSV. | |
| """ | |
| with open(filepath, 'w', newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['Time', 'CPU_Usage']) | |
| for t, v in zip(timestamps, values): | |
| writer.writerow([f"{t:.4f}", f"{v:.2f}"]) | |
| # ------------------------- Plotting ------------------------- # | |
| def plot_cpu_usage( | |
| timestamps: List[float], | |
| values: List[float], | |
| meta: Dict, | |
| outfile: str = "cpu_usage_history.png" | |
| ): | |
| title_mode = "Total 0–100%" if meta.get("normalized_0_100", True) else "Per-core sum" | |
| ema_part = f" with EMA {meta['ema_window']}" if meta.get('ema_window', 0) > 1 else "" | |
| title = f"CPU Usage: PID {meta['pid']} ({meta.get('process_name','')}) — {title_mode}{ema_part}" | |
| plt.figure(figsize=(10, 6)) | |
| plt.plot(timestamps, values, marker='o', markersize=0, linewidth=1.5, label="CPU Usage") | |
| # Average line | |
| avg = meta.get("average_cpu", 0.0) | |
| plt.axhline(y=avg, color='r', linestyle='--', linewidth=1.2, label=f"Avg: {avg:.2f}%%") | |
| plt.title(title) | |
| plt.xlabel("Time (s)") | |
| ylabel = "CPU Usage (%%)" if meta.get("normalized_0_100", True) else "CPU Usage (%% of one core)" | |
| plt.ylabel(ylabel) | |
| if meta.get("normalized_0_100", True): | |
| plt.ylim(0, 100) | |
| plt.grid(True, alpha=0.3) | |
| plt.legend() | |
| plt.tight_layout() | |
| plt.savefig(outfile) | |
| plt.show() | |
| print(f"Saved graph to: {outfile}") | |
| # ------------------------- CLI Entry ------------------------- # | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Plot CPU usage history for a process.") | |
| parser.add_argument("target", nargs='?', help="Process PID or exact name (e.g., 'python.exe' or 'python'). Required unless --csv-in is used.") | |
| parser.add_argument("--duration", type=float, default=60.0, help="Seconds to monitor.") | |
| parser.add_argument("--interval", type=float, default=1.0, help="Sampling interval (seconds).") | |
| parser.add_argument("--no-normalize", action="store_true", | |
| help="Do not normalize by logical CPU count (values may exceed 100%%).") | |
| parser.add_argument("--ema", type=int, default=0, | |
| help="EMA (Exponential Moving Average) window length (>1 to enable smoothing of CPU values to reduce spikes).") | |
| parser.add_argument("--out", default="cpu_usage_history.png", help="Output PNG path.") | |
| parser.add_argument("--no-progress", action="store_true", help="Disable progress indicator.") | |
| parser.add_argument("--csv-in", help="Input CSV path to load sample data instead of monitoring.") | |
| if len(sys.argv) == 1: | |
| parser.print_help() | |
| sys.exit(0) | |
| args = parser.parse_args() | |
| if args.csv_in: | |
| # Load from CSV | |
| ts, vals, meta = load_from_csv(args.csv_in, args.ema) | |
| if not ts: | |
| raise SystemExit("No data loaded from CSV.") | |
| # If target PID provided, get process name for title/filename | |
| if args.target: | |
| pid = resolve_pid(args.target) | |
| if pid: | |
| try: | |
| proc = psutil.Process(pid) | |
| meta['process_name'] = proc.name() | |
| meta['pid'] = pid | |
| except psutil.NoSuchProcess: | |
| pass | |
| # Set duration and interval for filename generation | |
| args.duration = meta['duration_actual'] | |
| args.interval = meta['interval'] | |
| else: | |
| # Monitor process | |
| if not args.target: | |
| raise SystemExit("Must provide target process or --csv-in") | |
| pid = resolve_pid(args.target) | |
| if pid is None: | |
| raise SystemExit(f"Process not found: {args.target}") | |
| ts, vals, meta = monitor_cpu_usage( | |
| pid=pid, | |
| duration=args.duration, | |
| interval=args.interval, | |
| normalize_total_100=not args.no_normalize, | |
| ema_window=args.ema, | |
| show_progress=not args.no_progress | |
| ) | |
| if not ts: | |
| raise SystemExit("No samples collected (process may have exited).") | |
| # Generate descriptive filename if default | |
| if args.out == "cpu_usage_history.png": | |
| dt_str = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| pname = meta['process_name'].replace(' ', '_').replace('/', '_').replace('\\', '_') | |
| pid = meta['pid'] | |
| dur = args.duration | |
| interv = args.interval | |
| ema_part = f"_ema{meta['ema_window']}" if meta.get('ema_window', 0) > 1 else "" | |
| args.out = f"{pname}_{pid}_{dur:.1f}s_{interv:.4f}s{ema_part}_{dt_str}.png" | |
| if not args.csv_in: | |
| # Write CSV with same base name as PNG | |
| csv_out = args.out.rsplit('.', 1)[0] + '.csv' | |
| write_to_csv(ts, vals, csv_out) | |
| print(f"Saved sample data to: {csv_out}") | |
| plot_cpu_usage(ts, vals, meta, outfile=args.out) | |
| # Console summary | |
| ema_str = f" with EMA {meta['ema_window']}" if (meta.get('ema_window', 0) and meta['ema_window'] > 1) else '' | |
| print( | |
| f"Summary over {meta['duration_actual']:.1f}s " | |
| f"({'normalized 0–100%%' if meta['normalized_0_100'] else 'per-core sum'}){ema_str}:" | |
| ) | |
| print(f" Samples : {meta['samples']}") | |
| print(f" Average : {meta['average_cpu']:.2f}%%") | |
| print(f" Min : {meta['min_cpu']:.2f}%%") | |
| print(f" Max : {meta['max_cpu']:.2f}%%") | |
| print(f" Interval : {meta['interval']}s") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment