Skip to content

Instantly share code, notes, and snippets.

@pjbull
Created October 4, 2021 17:27
Show Gist options
  • Save pjbull/b9c7b9bb9e776c9949f0da7deb33a93c to your computer and use it in GitHub Desktop.
Save pjbull/b9c7b9bb9e776c9949f0da7deb33a93c to your computer and use it in GitHub Desktop.
Terminal process monitoring in Python
from collections import Counter, deque
from copy import copy
from dataclasses import dataclass, asdict
import datetime
from itertools import islice
import json
import os
from pathlib import Path
import sys
import time
from typing import Optional
import nvsmi
import pexpect
from pexpect import popen_spawn
import psutil
from rich import box
from rich.console import Console
from rich.layout import Layout, Panel
from rich.live import Live
from rich.markdown import Markdown
from rich.pretty import pprint
from rich.progress import Progress
from rich.table import Table
@dataclass
class ProcessObservation:
pid: int
command: str
cpu_pct: float
memory: int
mem_pct: float
start_time: datetime.datetime
time_since_start: datetime.timedelta
thread_count: int
state: str
@property
def memory_str(self) -> str:
if self.memory > 1e9:
return f"{int(self.memory/1e9)}G"
if self.memory > 1e6:
return f"{int(self.memory/1e6)}M"
if self.memory > 1e3:
return f"{int(self.memory/1e3)}K"
return str(self.memory)
@classmethod
def from_psutil(cls, proc):
with proc.oneshot():
start_time = datetime.datetime.fromtimestamp(proc.create_time())
return cls(
pid=proc.pid,
command=proc.name(),
cpu_pct=proc.cpu_percent(),
memory=proc.memory_info().rss,
mem_pct=proc.memory_percent(),
start_time=start_time,
time_since_start=datetime.datetime.now() - start_time,
thread_count=proc.num_threads(),
state=proc.status(),
)
@property
def table_headers(self):
return (
"PID", "Command", "CPU %", "Memory", "Mem %", "Time", "Thread #", "State"
)
@property
def table_row(self) -> str:
return(
str(self.pid),
self.command,
f"{self.cpu_pct:.1f}",
self.memory_str,
f"{self.mem_pct:.1f}",
str(self.time_since_start),
str(self.thread_count),
self.state,
)
@dataclass
class ProcessSummary:
logfile: Optional[Path]
command: str
max_cpu_pct: float
max_memory: int
max_mem_pct: float
start_time: datetime.datetime
duration: datetime.timedelta
max_thread_count: int
exitcode: int
oom_detected: bool
oom_cuda_detected: bool
def json(self, filepath=None):
if filepath:
filepath = Path(filepath)
data = asdict(self)
# encode as json types for serialization
data["start_time"] = data["start_time"].isoformat() # iso-8601 string
data["duration"] = data["duration"].total_seconds() # total seconds as float
if self.logfile and filepath:
# logfile relative to json file if we are writing it out
data["logfile"] = str(data["logfile"].resolve().relative_to(filepath.parent.resolve()))
elif self.logfile:
# absolute path by default
data["logfile"] = str(data["logfile"].resolve())
json_string = json.dumps(data)
if filepath:
filepath.write_text(json_string)
return json_string
def update_process_table(observations, maxs, height: int) -> Table:
table = Table(
*maxs.table_headers, box=box.SIMPLE
)
# put maximum on top
table.add_row(
" --- ",
"MAXIMUMS",
f"{maxs.cpu_pct:.1f}",
maxs.memory_str,
f"{maxs.mem_pct:.1f}",
str(maxs.time_since_start),
str(maxs.thread_count),
" --- ",
style="bold",
end_section=True,
)
# write out each observation
for process in observations:
table.add_row(*process.table_row)
return table
def update_gpu_tables(maxs) -> Table:
headers = [
"ID",
"util",
"processes",
"Mem %",
"Mem total",
]
observation = list(nvsmi.get_gpus())
n_proc_per_gpu = Counter([x.gpu_id for x in nvsmi.get_gpu_processes()])
rows = []
for gpu in observation:
gpu.n_proc = n_proc_per_gpu[gpu.id]
for field in ["gpu_util", "mem_util", "n_proc", "mem_total"]:
maxs[field] = max(maxs.get(field, 0), getattr(gpu, field))
rows.append((
str(gpu.id),
f"{gpu.gpu_util:.1f}%",
str(gpu.n_proc),
f"{gpu.mem_util:.1f}%",
f"{gpu.mem_total}",
))
table = Table(
*headers, box=box.SIMPLE
)
# put maximum on top
table.add_row(
"MAX",
f"{maxs['gpu_util']:.1f}%",
f"{maxs['n_proc']}",
f"{maxs['mem_util']:.1f}%",
f"{maxs['mem_total']}",
style="bold",
end_section=True,
)
# write out each observation
for row in rows:
table.add_row(*row)
return table
def _lines_deque_to_str(d, n_lines):
return "\n".join(
islice(
d,
max(0, len(d) - n_lines),
len(d)
)
)
def monitor(command, details={'is_cool': True, 'batch_size': 8}, interval=0.3, wait_for_exit=True):
logfile = Path(f"monitor-{command[0]}-{datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')}.log")
console = Console()
layout = Layout()
layout.split_column(
Layout(name="upper"),
Layout(name="logs")
)
layout["upper"].split_row(
Layout(name="process"),
Layout(name="details"),
)
layout["upper"]["process"].split_column(
Layout(name="cpu"),
Layout(name="gpu", size=len(list(nvsmi.get_gpus())) + 8),
)
cpu_layout = layout["upper"]["process"]["cpu"]
gpu_available = bool(nvsmi.is_nvidia_smi_on_path())
if gpu_available:
gpu_panel = Panel("", title=f"{len(list(nvsmi.get_gpus()))} GPU(s)")
else:
gpu_panel = Panel("No GPU identified (nvidia-smi not on path).", style="yellow", title="GPU")
layout["upper"]["process"]["gpu"].update(gpu_panel)
layout["details"].split_column(
Layout(name="command"),
Layout(name="status"),
)
command_md = Markdown((
f"`$ {' '.join(command)}`" + "\n\n" + json.dumps(details)
))
command_panel = Panel(
command_md,
title="Command"
)
layout["upper"]["details"]["command"].ratio = 5
layout["upper"]["details"]["command"].update(command_panel)
status_panel = Panel("", title="Status")
layout["upper"]["details"]["status"].update(status_panel)
logs_panel = Panel("No output yet.", title="Output", subtitle="[red]Crtl+C to exit")
layout["logs"].update(logs_panel)
process_panel = Panel("", title="Process history")
cpu_layout.update(process_panel)
with Live(layout, console=console, screen=True, auto_refresh=True) as live:
# process observation history deque
proc_obs = deque(maxlen=100)
# store up to 500 lines from the logs for display; others written to logfile
logs = deque(maxlen=500)
# start the process to monitor
proc = ExecutingSubprocess(command, logfile=logfile)
proc.start()
running_progress = Progress()
running_task = running_progress.add_task("[green]Running...", start=False)
status_panel.renderable = running_progress
# log first observation and set as the max that we saw
proc_obs.append(proc.process_data)
maxs = copy(proc_obs[0])
gpu_maxs = {}
while True:
try:
process_panel.renderable = update_process_table(proc_obs, maxs, layout.map[cpu_layout].region.height)
if gpu_available:
gpu_panel.renderable = update_gpu_tables(gpu_maxs)
logs_height = layout.map[layout["logs"]].region.height - 4 # buffer for border
logs_panel.renderable = _lines_deque_to_str(logs, logs_height)
time.sleep(interval)
# observe the process and record results
if proc.is_running:
current_obs = proc.process_data
proc_obs.appendleft(current_obs)
# trac
for field in ["cpu_pct", "thread_count", "memory", "time_since_start"]:
if getattr(current_obs, field) > getattr(maxs, field):
setattr(maxs, field, getattr(current_obs, field))
# observe the output and record results
output = proc.get_output()
if output and len(output) > 0:
logs.extend(output)
else:
# get the last of the logs and render them
logs.extend(proc.get_output(all_data=True))
logs_height = layout.map[layout["logs"]].region.height - 4 # buffer for border
logs_panel.renderable = _lines_deque_to_str(logs, logs_height)
if proc.exit_code != 0:
status_panel.style = "red"
status_panel.renderable = f"Process exited with code {proc.exit_code}."
else:
status_panel.style = "green"
status_panel.renderable = "Process ran successfully."
break
except KeyboardInterrupt:
if proc.is_running:
status_panel.renderable = f"[yellow]Killing process..."
proc.terminate()
# detect OOM lines in logs
oom = (
"out of memory" in "\n".join(logs)
)
# detect CUDA oom lines in logs
cuda_oom = (
"CUDA error: out of memory" in "\n".join(logs)
)
# write results
summary = ProcessSummary(
logfile=logfile,
command=" ".join(command),
max_cpu_pct=maxs.cpu_pct,
max_memory=maxs.memory,
max_mem_pct=maxs.mem_pct,
max_thread_count=maxs.thread_count,
duration=maxs.time_since_start,
start_time=maxs.start_time,
exitcode=proc.exit_code,
oom_detected=oom,
oom_cuda_detected=cuda_oom,
)
summary.json(filepath=logfile.with_suffix(".json"))
status_panel.renderable += f"\n\nLogs saved to: {logfile}\nJSON saved to: {logfile.with_suffix('.json')}\n"
# display results until manually exited
if wait_for_exit:
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
pass
class ExecutingSubprocess:
def __init__(self, cmd, logfile=None):
self.cmd = cmd
self.process = None
self._log_handle = open(str(logfile), "w")
self._pexpect_proc = None
def start(self):
# keep pexpect process internal for reading status
sub_env = os.environ.copy()
sub_env["PYTHON_UNBUFFERED"] = "1"
self._pexpect_proc = popen_spawn.PopenSpawn(self.cmd, encoding='utf-8', env=sub_env, logfile=self._log_handle)
# expose psutil process for external use
self.process = psutil.Process(self._pexpect_proc.pid)
def terminate(self):
self.process.terminate()
self._exit_code = self.process.wait(5)
if self.is_running:
self.process.kill()
self._exit_code = self.process.wait()
def get_output(self, all_data=False):
output = ""
# try to read from process and break if process is done
try:
# get up to 10000 chracters at a time
output = self._pexpect_proc.read_nonblocking(
size=10000 if not all_data else None,
timeout=1 if not all_data else -1
)
except pexpect.exceptions.EOF:
pass
finally:
return output.split("\n")
@property
def is_running(self):
try:
if self.process.status() == psutil.STATUS_ZOMBIE:
self._exit_code = self.process.wait()
return False
except psutil.NoSuchProcess:
return False
return self.process.is_running()
@property
def process_data(self):
return ProcessObservation.from_psutil(self.process)
@property
def exit_code(self):
if self.is_running:
return None
else:
return self._exit_code
def __del__(self):
if hasattr(self, "_log_handle"):
self._log_handle.close()
if __name__ == "__main__":
# everything except "python monitor.py"
monitor(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment