Created
October 4, 2021 17:27
-
-
Save pjbull/b9c7b9bb9e776c9949f0da7deb33a93c to your computer and use it in GitHub Desktop.
Terminal process monitoring in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import Counter, deque | |
from copy import copy | |
from dataclasses import dataclass, asdict | |
import datetime | |
from itertools import islice | |
import json | |
import os | |
from pathlib import Path | |
import sys | |
import time | |
from typing import Optional | |
import nvsmi | |
import pexpect | |
from pexpect import popen_spawn | |
import psutil | |
from rich import box | |
from rich.console import Console | |
from rich.layout import Layout, Panel | |
from rich.live import Live | |
from rich.markdown import Markdown | |
from rich.pretty import pprint | |
from rich.progress import Progress | |
from rich.table import Table | |
@dataclass | |
class ProcessObservation: | |
pid: int | |
command: str | |
cpu_pct: float | |
memory: int | |
mem_pct: float | |
start_time: datetime.datetime | |
time_since_start: datetime.timedelta | |
thread_count: int | |
state: str | |
@property | |
def memory_str(self) -> str: | |
if self.memory > 1e9: | |
return f"{int(self.memory/1e9)}G" | |
if self.memory > 1e6: | |
return f"{int(self.memory/1e6)}M" | |
if self.memory > 1e3: | |
return f"{int(self.memory/1e3)}K" | |
return str(self.memory) | |
@classmethod | |
def from_psutil(cls, proc): | |
with proc.oneshot(): | |
start_time = datetime.datetime.fromtimestamp(proc.create_time()) | |
return cls( | |
pid=proc.pid, | |
command=proc.name(), | |
cpu_pct=proc.cpu_percent(), | |
memory=proc.memory_info().rss, | |
mem_pct=proc.memory_percent(), | |
start_time=start_time, | |
time_since_start=datetime.datetime.now() - start_time, | |
thread_count=proc.num_threads(), | |
state=proc.status(), | |
) | |
@property | |
def table_headers(self): | |
return ( | |
"PID", "Command", "CPU %", "Memory", "Mem %", "Time", "Thread #", "State" | |
) | |
@property | |
def table_row(self) -> str: | |
return( | |
str(self.pid), | |
self.command, | |
f"{self.cpu_pct:.1f}", | |
self.memory_str, | |
f"{self.mem_pct:.1f}", | |
str(self.time_since_start), | |
str(self.thread_count), | |
self.state, | |
) | |
@dataclass | |
class ProcessSummary: | |
logfile: Optional[Path] | |
command: str | |
max_cpu_pct: float | |
max_memory: int | |
max_mem_pct: float | |
start_time: datetime.datetime | |
duration: datetime.timedelta | |
max_thread_count: int | |
exitcode: int | |
oom_detected: bool | |
oom_cuda_detected: bool | |
def json(self, filepath=None): | |
if filepath: | |
filepath = Path(filepath) | |
data = asdict(self) | |
# encode as json types for serialization | |
data["start_time"] = data["start_time"].isoformat() # iso-8601 string | |
data["duration"] = data["duration"].total_seconds() # total seconds as float | |
if self.logfile and filepath: | |
# logfile relative to json file if we are writing it out | |
data["logfile"] = str(data["logfile"].resolve().relative_to(filepath.parent.resolve())) | |
elif self.logfile: | |
# absolute path by default | |
data["logfile"] = str(data["logfile"].resolve()) | |
json_string = json.dumps(data) | |
if filepath: | |
filepath.write_text(json_string) | |
return json_string | |
def update_process_table(observations, maxs, height: int) -> Table: | |
table = Table( | |
*maxs.table_headers, box=box.SIMPLE | |
) | |
# put maximum on top | |
table.add_row( | |
" --- ", | |
"MAXIMUMS", | |
f"{maxs.cpu_pct:.1f}", | |
maxs.memory_str, | |
f"{maxs.mem_pct:.1f}", | |
str(maxs.time_since_start), | |
str(maxs.thread_count), | |
" --- ", | |
style="bold", | |
end_section=True, | |
) | |
# write out each observation | |
for process in observations: | |
table.add_row(*process.table_row) | |
return table | |
def update_gpu_tables(maxs) -> Table: | |
headers = [ | |
"ID", | |
"util", | |
"processes", | |
"Mem %", | |
"Mem total", | |
] | |
observation = list(nvsmi.get_gpus()) | |
n_proc_per_gpu = Counter([x.gpu_id for x in nvsmi.get_gpu_processes()]) | |
rows = [] | |
for gpu in observation: | |
gpu.n_proc = n_proc_per_gpu[gpu.id] | |
for field in ["gpu_util", "mem_util", "n_proc", "mem_total"]: | |
maxs[field] = max(maxs.get(field, 0), getattr(gpu, field)) | |
rows.append(( | |
str(gpu.id), | |
f"{gpu.gpu_util:.1f}%", | |
str(gpu.n_proc), | |
f"{gpu.mem_util:.1f}%", | |
f"{gpu.mem_total}", | |
)) | |
table = Table( | |
*headers, box=box.SIMPLE | |
) | |
# put maximum on top | |
table.add_row( | |
"MAX", | |
f"{maxs['gpu_util']:.1f}%", | |
f"{maxs['n_proc']}", | |
f"{maxs['mem_util']:.1f}%", | |
f"{maxs['mem_total']}", | |
style="bold", | |
end_section=True, | |
) | |
# write out each observation | |
for row in rows: | |
table.add_row(*row) | |
return table | |
def _lines_deque_to_str(d, n_lines): | |
return "\n".join( | |
islice( | |
d, | |
max(0, len(d) - n_lines), | |
len(d) | |
) | |
) | |
def monitor(command, details={'is_cool': True, 'batch_size': 8}, interval=0.3, wait_for_exit=True): | |
logfile = Path(f"monitor-{command[0]}-{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}.log") | |
console = Console() | |
layout = Layout() | |
layout.split_column( | |
Layout(name="upper"), | |
Layout(name="logs") | |
) | |
layout["upper"].split_row( | |
Layout(name="process"), | |
Layout(name="details"), | |
) | |
layout["upper"]["process"].split_column( | |
Layout(name="cpu"), | |
Layout(name="gpu", size=len(list(nvsmi.get_gpus())) + 8), | |
) | |
cpu_layout = layout["upper"]["process"]["cpu"] | |
gpu_available = bool(nvsmi.is_nvidia_smi_on_path()) | |
if gpu_available: | |
gpu_panel = Panel("", title=f"{len(list(nvsmi.get_gpus()))} GPU(s)") | |
else: | |
gpu_panel = Panel("No GPU identified (nvidia-smi not on path).", style="yellow", title="GPU") | |
layout["upper"]["process"]["gpu"].update(gpu_panel) | |
layout["details"].split_column( | |
Layout(name="command"), | |
Layout(name="status"), | |
) | |
command_md = Markdown(( | |
f"`$ {' '.join(command)}`" + "\n\n" + json.dumps(details) | |
)) | |
command_panel = Panel( | |
command_md, | |
title="Command" | |
) | |
layout["upper"]["details"]["command"].ratio = 5 | |
layout["upper"]["details"]["command"].update(command_panel) | |
status_panel = Panel("", title="Status") | |
layout["upper"]["details"]["status"].update(status_panel) | |
logs_panel = Panel("No output yet.", title="Output", subtitle="[red]Crtl+C to exit") | |
layout["logs"].update(logs_panel) | |
process_panel = Panel("", title="Process history") | |
cpu_layout.update(process_panel) | |
with Live(layout, console=console, screen=True, auto_refresh=True) as live: | |
# process observation history deque | |
proc_obs = deque(maxlen=100) | |
# store up to 500 lines from the logs for display; others written to logfile | |
logs = deque(maxlen=500) | |
# start the process to monitor | |
proc = ExecutingSubprocess(command, logfile=logfile) | |
proc.start() | |
running_progress = Progress() | |
running_task = running_progress.add_task("[green]Running...", start=False) | |
status_panel.renderable = running_progress | |
# log first observation and set as the max that we saw | |
proc_obs.append(proc.process_data) | |
maxs = copy(proc_obs[0]) | |
gpu_maxs = {} | |
while True: | |
try: | |
process_panel.renderable = update_process_table(proc_obs, maxs, layout.map[cpu_layout].region.height) | |
if gpu_available: | |
gpu_panel.renderable = update_gpu_tables(gpu_maxs) | |
logs_height = layout.map[layout["logs"]].region.height - 4 # buffer for border | |
logs_panel.renderable = _lines_deque_to_str(logs, logs_height) | |
time.sleep(interval) | |
# observe the process and record results | |
if proc.is_running: | |
current_obs = proc.process_data | |
proc_obs.appendleft(current_obs) | |
# trac | |
for field in ["cpu_pct", "thread_count", "memory", "time_since_start"]: | |
if getattr(current_obs, field) > getattr(maxs, field): | |
setattr(maxs, field, getattr(current_obs, field)) | |
# observe the output and record results | |
output = proc.get_output() | |
if output and len(output) > 0: | |
logs.extend(output) | |
else: | |
# get the last of the logs and render them | |
logs.extend(proc.get_output(all_data=True)) | |
logs_height = layout.map[layout["logs"]].region.height - 4 # buffer for border | |
logs_panel.renderable = _lines_deque_to_str(logs, logs_height) | |
if proc.exit_code != 0: | |
status_panel.style = "red" | |
status_panel.renderable = f"Process exited with code {proc.exit_code}." | |
else: | |
status_panel.style = "green" | |
status_panel.renderable = "Process ran successfully." | |
break | |
except KeyboardInterrupt: | |
if proc.is_running: | |
status_panel.renderable = f"[yellow]Killing process..." | |
proc.terminate() | |
# detect OOM lines in logs | |
oom = ( | |
"out of memory" in "\n".join(logs) | |
) | |
# detect CUDA oom lines in logs | |
cuda_oom = ( | |
"CUDA error: out of memory" in "\n".join(logs) | |
) | |
# write results | |
summary = ProcessSummary( | |
logfile=logfile, | |
command=" ".join(command), | |
max_cpu_pct=maxs.cpu_pct, | |
max_memory=maxs.memory, | |
max_mem_pct=maxs.mem_pct, | |
max_thread_count=maxs.thread_count, | |
duration=maxs.time_since_start, | |
start_time=maxs.start_time, | |
exitcode=proc.exit_code, | |
oom_detected=oom, | |
oom_cuda_detected=cuda_oom, | |
) | |
summary.json(filepath=logfile.with_suffix(".json")) | |
status_panel.renderable += f"\n\nLogs saved to: {logfile}\nJSON saved to: {logfile.with_suffix('.json')}\n" | |
# display results until manually exited | |
if wait_for_exit: | |
try: | |
while True: | |
time.sleep(1.0) | |
except KeyboardInterrupt: | |
pass | |
class ExecutingSubprocess: | |
def __init__(self, cmd, logfile=None): | |
self.cmd = cmd | |
self.process = None | |
self._log_handle = open(str(logfile), "w") | |
self._pexpect_proc = None | |
def start(self): | |
# keep pexpect process internal for reading status | |
sub_env = os.environ.copy() | |
sub_env["PYTHON_UNBUFFERED"] = "1" | |
self._pexpect_proc = popen_spawn.PopenSpawn(self.cmd, encoding='utf-8', env=sub_env, logfile=self._log_handle) | |
# expose psutil process for external use | |
self.process = psutil.Process(self._pexpect_proc.pid) | |
def terminate(self): | |
self.process.terminate() | |
self._exit_code = self.process.wait(5) | |
if self.is_running: | |
self.process.kill() | |
self._exit_code = self.process.wait() | |
def get_output(self, all_data=False): | |
output = "" | |
# try to read from process and break if process is done | |
try: | |
# get up to 10000 chracters at a time | |
output = self._pexpect_proc.read_nonblocking( | |
size=10000 if not all_data else None, | |
timeout=1 if not all_data else -1 | |
) | |
except pexpect.exceptions.EOF: | |
pass | |
finally: | |
return output.split("\n") | |
@property | |
def is_running(self): | |
try: | |
if self.process.status() == psutil.STATUS_ZOMBIE: | |
self._exit_code = self.process.wait() | |
return False | |
except psutil.NoSuchProcess: | |
return False | |
return self.process.is_running() | |
@property | |
def process_data(self): | |
return ProcessObservation.from_psutil(self.process) | |
@property | |
def exit_code(self): | |
if self.is_running: | |
return None | |
else: | |
return self._exit_code | |
def __del__(self): | |
if hasattr(self, "_log_handle"): | |
self._log_handle.close() | |
if __name__ == "__main__": | |
# everything except "python monitor.py" | |
monitor(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment