Created
September 15, 2024 18:26
-
-
Save tos-kamiya/4665a763fc17b3d4fb6134a69a575159 to your computer and use it in GitHub Desktop.
A script to monitor CPU/GPU RAM usage of a command on NVIDIA systems, with process search by keyword and JSON support.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import os | |
import subprocess | |
import sys | |
import time | |
from typing import List, Tuple, Dict | |
def monitor_processes(command: List[str], keywords: List[str]) -> Tuple[int, Dict[int, Tuple[str, int, int]], float]: | |
""" | |
Executes the specified command and monitors resource usage (GPU and CPU memory) for both | |
the executed process and other target processes. | |
Args: | |
command (List[str]): The command and its arguments to be executed. | |
keywords (List[str]): List of keywords to search for target processes. | |
Returns: | |
Tuple[int, Dict[int, Tuple[str, int, int]], float]: | |
A tuple containing: | |
- The PID of the executed command as an integer. | |
- A dictionary where: | |
- Key: PID of the process (executed or target). | |
- Value: Tuple containing (command line, peak GPU memory, peak CPU memory). | |
- Elapsed time of the execution of the command as a float. | |
""" | |
start_time = time.time() | |
process = subprocess.Popen(command) | |
# Dictionary to store information about each monitored process | |
# Key: PID, Value: (command line, peak GPU memory, peak CPU memory) | |
monitored_processes: Dict[int, Tuple[str, int, int]] = {} | |
# Store the command line for the executed process | |
executed_process_cmdline = " ".join(command) | |
# Add the executed process's initial entry | |
monitored_processes[process.pid] = (executed_process_cmdline, 0, 0) | |
try: | |
while process.poll() is None: # Loop until the process finishes | |
# Gather all PIDs (executed and target processes) | |
pids_to_monitor = set([process.pid]) # Start with the executed process (use set to ensure uniqueness) | |
pid_to_cmdline = {} | |
for keyword in keywords: | |
target_pids = find_target_processes(keyword) | |
pids_to_monitor.update(target_pids.keys()) # Add target PIDs to monitor set | |
pid_to_cmdline.update(target_pids) # Update dictionary with target PIDs and their command lines | |
# Get the GPU and CPU memory usage for all monitored PIDs | |
pids = sorted(pids_to_monitor) | |
gpu_memories = get_gpu_memory(pids) | |
cpu_memories = get_cpu_memory(pids) | |
# Update the peak memory usage for each monitored process | |
for pid in pids_to_monitor: | |
# Skip the executed process when monitoring target processes | |
if pid == process.pid: | |
cmdline = executed_process_cmdline | |
else: | |
cmdline = pid_to_cmdline.get(pid, "Unknown") | |
gpu_memory = gpu_memories.get(pid, 0) | |
cpu_memory = cpu_memories.get(pid, 0) | |
if pid not in monitored_processes: | |
monitored_processes[pid] = (cmdline, gpu_memory, cpu_memory) | |
else: | |
_, old_gpu_memory, old_cpu_memory = monitored_processes[pid] | |
monitored_processes[pid] = ( | |
cmdline, | |
max(old_gpu_memory, gpu_memory), | |
max(old_cpu_memory, cpu_memory), | |
) | |
time.sleep(0.5) # Wait for 0.5 seconds before the next check | |
except KeyboardInterrupt: | |
print("Monitoring interrupted.") | |
finally: | |
process.terminate() # Terminate the process | |
process.wait() | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
return process.pid, monitored_processes, elapsed_time | |
def get_gpu_memory(pids: List[int]) -> Dict[int, int]: | |
""" | |
Retrieves the VRAM usage for a list of PIDs. | |
Args: | |
pids (List[int]): List of process IDs. | |
Returns: | |
Dict[int, int]: A dictionary where the keys are PIDs and the values are VRAM usage in MiB. | |
""" | |
gpu_memory_usage = {} | |
try: | |
output = subprocess.check_output( | |
["nvidia-smi", "--query-compute-apps=pid,used_gpu_memory", "--format=csv,noheader"], | |
stderr=subprocess.STDOUT, | |
universal_newlines=True, | |
) | |
for line in output.splitlines(): | |
ps, gs = line.split(",") | |
process_pid = int(ps) | |
if process_pid in pids: | |
assert gs.endswith(" MiB") | |
gpu_memory = int(gs[:-4]) | |
gpu_memory_usage[process_pid] = gpu_memory | |
except subprocess.CalledProcessError: | |
pass # If there's an error, we'll return an empty dictionary | |
return gpu_memory_usage | |
def get_cpu_memory(pids: List[int]) -> Dict[int, int]: | |
""" | |
Retrieves the RAM (CPU memory) usage for a list of PIDs. | |
Args: | |
pids (List[int]): List of process IDs. | |
Returns: | |
Dict[int, int]: A dictionary where the keys are PIDs and the values are RAM usage in MiB. | |
""" | |
cpu_memory_usage = {} | |
for pid in pids: | |
try: | |
# For Linux | |
if sys.platform.startswith("linux"): | |
with open(f"/proc/{pid}/status", "r") as f: | |
for line in f: | |
if line.startswith("VmRSS:"): | |
ram_kb = int(line.split()[1].replace("kB", "")) | |
cpu_memory_usage[pid] = ram_kb // 1024 # Return RAM usage in MiB | |
break | |
except FileNotFoundError: | |
cpu_memory_usage[pid] = 0 # If the process is not found, set its memory usage to 0 | |
return cpu_memory_usage | |
def find_target_processes(keyword: str) -> Dict[int, str]: | |
""" | |
Retrieve the PIDs and command lines of all processes matching the specified keyword, | |
skipping the current script itself. | |
Args: | |
keyword (str): The keyword to search for in the process command line. | |
Returns: | |
Dict[int, str]: A dictionary where the keys are PIDs (integers) and the values are the corresponding command lines (strings). | |
""" | |
current_pid = os.getpid() # Get the current script's PID | |
try: | |
# Run the 'ps' command to find all processes that contain the keyword | |
output = subprocess.check_output(["ps", "ax", "-o", "pid,command"], universal_newlines=True) | |
target_pids = {} | |
for line in output.splitlines(): | |
if keyword in line: | |
pid = int(line.split()[0]) # The first part of the line is the PID | |
if pid != current_pid: # Skip the current script itself | |
cmdline = " ".join(line.split()[1:]) # The rest is the command line | |
target_pids[pid] = cmdline | |
return target_pids | |
except subprocess.CalledProcessError: | |
return {} # Return an empty dictionary if there's an error | |
def output_text_results( | |
executed_process_pid: int, | |
pids: List[int], | |
monitored_processes: Dict[int, Tuple[str, int, int]], | |
elapsed_time: float, | |
output_file: str, | |
) -> None: | |
""" | |
Outputs the monitoring results to a file or standard output in plain text format. | |
Args: | |
executed_process_pid (int): The PID of the executed command. | |
pids (List[int]): List of PIDs to display. | |
monitored_processes (Dict[int, Tuple[str, int, int]]): Dictionary containing monitored process information. | |
elapsed_time (float): The total elapsed time for the monitored command. | |
output_file (str): The file to write the results to (if specified). | |
""" | |
output_data = [ | |
"----------", | |
f"Executed process ID: {executed_process_pid}", | |
f"Elapsed time: {elapsed_time:.2f} seconds", | |
"", | |
] | |
for pid in pids: | |
cmdline, gpu_memory, cpu_memory = monitored_processes[pid] | |
output_data += [ | |
f"PID: {pid}", | |
f"Command: {cmdline}", | |
f"Peak GPU memory usage: {gpu_memory} MiB", | |
f"Peak CPU memory usage: {cpu_memory} MiB", | |
"", | |
] | |
output_data.append("") # Ensure a newline at the end of the file | |
# Write to file if specified, otherwise print to stdout | |
if output_file: | |
with open(output_file, "w") as f: | |
f.write("\n".join(output_data)) | |
else: | |
print("\n".join(output_data), end="") | |
def output_json_results( | |
executed_process_pid: int, | |
pids: List[int], | |
monitored_processes: Dict[int, Tuple[str, int, int]], | |
elapsed_time: float, | |
output_file: str, | |
) -> None: | |
""" | |
Outputs the monitoring results to a file or standard output in JSON format. | |
Args: | |
executed_process_pid (int): The PID of the executed command. | |
pids (List[int]): List of PIDs to display. | |
monitored_processes (Dict[int, Tuple[str, int, int]]): Dictionary containing monitored process information. | |
elapsed_time (float): The total elapsed time for the monitored command. | |
output_file (str): The file to write the results to (if specified). | |
""" | |
results = {"executed_process_pid": executed_process_pid, "elapsed_time": elapsed_time, "processes": []} | |
for pid in pids: | |
cmdline, gpu_memory, cpu_memory = monitored_processes[pid] | |
process_info = { | |
"pid": pid, | |
"command": cmdline, | |
"peak_gpu_memory_mib": gpu_memory, | |
"peak_cpu_memory_mib": cpu_memory, | |
} | |
results["processes"].append(process_info) | |
output_data = json.dumps(results, indent=4) | |
# Write to file if specified, otherwise print to stdout | |
if output_file: | |
with open(output_file, "w") as f: | |
f.write(output_data) | |
else: | |
print(output_data) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Monitor the memory usage of a process and related processes.") | |
parser.add_argument("-w", "--keyword", action="append", help="Keyword to monitor target processes", required=False) | |
parser.add_argument("-o", "--output", help="Output file to store the results", required=False) | |
parser.add_argument("--json", action="store_true", help="Output in JSON format", required=False) | |
parser.add_argument("command", nargs=argparse.REMAINDER, help="The command to execute") | |
args = parser.parse_args() | |
if not args.command: | |
print("Error: A command to execute must be specified.") | |
sys.exit(1) | |
keywords = args.keyword if args.keyword else [] | |
pid, monitored_processes, elapsed_time = monitor_processes(args.command, keywords) | |
pids = list(monitored_processes.keys()) | |
# Sort the PIDs to ensure the invoked process is at the top | |
if pid in pids: | |
pids.remove(pid) | |
pids.sort() | |
pids.insert(0, pid) | |
# Output results based on the format | |
f = output_json_results if args.json else output_text_results | |
f(pid, pids, monitored_processes, elapsed_time, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment