Created
April 8, 2025 10:18
-
-
Save sadimanna/d609e7b8f452c33944bf6896876a8a1e to your computer and use it in GitHub Desktop.
This script can be used to check how much memory is being utilized on each GPU of every node
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
# import paramiko | |
import getpass | |
from collections import defaultdict | |
import time, os | |
from typing import List, Dict, Union | |
import sys | |
SSH_TIMEOUT=10 | |
PLINK_PATH=r'"C:\Program Files\PuTTY\plink.EXE"' | |
# List of nodes to check (modify with your node hostnames/IPs) | |
NODES = ["node1", "node2", "node3"] | |
def find_plink() -> str: | |
"""Locate plink.exe through multiple potential locations""" | |
paths_to_check = [ | |
r"C:\Program Files\PuTTY\plink.exe", | |
r"C:\Program Files (x86)\PuTTY\plink.exe", | |
r"C:\tools\putty\plink.exe", | |
os.path.join(os.environ.get("PROGRAMFILES", ""), "PuTTY", "plink.exe"), | |
os.path.join(os.environ.get("PROGRAMFILES(X86)", ""), "PuTTY", "plink.exe") | |
] | |
# Check PATH environment variable | |
try: | |
from shutil import which | |
path_plink = which("plink") | |
if path_plink: | |
return path_plink | |
except ImportError: | |
pass | |
# Check common installation locations | |
for path in paths_to_check: | |
if os.path.exists(path): | |
return path | |
raise FileNotFoundError("Could not locate plink.exe in common locations or PATH") | |
def get_credentials(): | |
"""Securely prompt for SSH credentials""" | |
username = input("Enter SSH username: ") | |
password = getpass.getpass("Enter SSH password: ") | |
return username, password | |
def execute_plink(command: List[str]) -> Dict[str, Union[int, str]]: | |
"""Execute Plink command with proper stream handling""" | |
env = os.environ.copy() | |
# Mimic PowerShell's PATH environment | |
ps_path = r"C:\Windows\System32\WindowsPowerShell\v1.0" | |
env["PATH"] = f"{ps_path};{env.get('PATH', '')}" | |
try: | |
result = subprocess.run( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=SSH_TIMEOUT, | |
env=env, | |
check=True | |
) | |
return { | |
"status": "success", | |
"stdout": result.stdout.strip(), | |
"stderr": result.stderr.strip() | |
} | |
except subprocess.CalledProcessError as e: | |
return { | |
"status": "error", | |
"code": e.returncode, | |
"stdout": e.stdout.strip(), | |
"stderr": e.stderr.strip() | |
} | |
except subprocess.TimeoutExpired as e: | |
return { | |
"status": "timeout", | |
"error": f"Command timed out after {SSH_TIMEOUT}s" | |
} | |
# def check_gpu(plink_path, node, username, password): | |
# """Check GPU status using sshpass""" | |
# env = os.environ.copy() | |
# ps_path = r"C:\Windows\System32\WindowsPowerShell\v1.0" | |
# env["PATH"] = f"{ps_path};{env.get('PATH', '')}" | |
# creationflags = subprocess.CREATE_NO_WINDOW | |
# try: | |
# # print(f"plink -ssh {username}@{node} -pw {password}") | |
# command = [ | |
# plink_path, | |
# "-ssh",f"{username}@{node}", | |
# "-pw", f"{password}", | |
# "-batch", | |
# "nvidia-smi --query-gpu=index,utilization.gpu,memory.used,memory.total, --format=csv,noheader,nounits" | |
# ] | |
# # print("Here2") | |
# start_time = time() | |
# result = subprocess.run( | |
# command, | |
# stdout=subprocess.PIPE, | |
# stderr=subprocess.PIPE, | |
# text=True, | |
# timeout=SSH_TIMEOUT, | |
# env=env, | |
# check=True | |
# ) | |
# elapsed = time() - start_time | |
# print(f"\n✅ {node} completed in {elapsed:.1f}s") | |
# print(f"Exit code: {result.returncode}") | |
# print(f"STDOUT:\n{result.stdout.strip() or '(no output)'}") | |
# print(f"STDERR:\n{result.stderr.strip() or '(no errors)'}") | |
# return result.stdout.strip().split('\n') | |
# except subprocess.TimeoutExpired as e: | |
# print(f"\n⏰ {node} timed out after {SSH_TIMEOUT}s") | |
# print(f"Partial output: {e.stdout.decode()[:200] if e.stdout else ''}") | |
# return None | |
# except Exception as e: | |
# print(f"\n❌ {node} failed with unexpected error: {str(e)}") | |
# return None | |
# finally: | |
# print("Something happened") | |
def parse_gpu_memory(lines): | |
"""Parse and format GPU memory information""" | |
gpu_info = [] | |
for line in lines: | |
if not line or any(x in line for x in ["Error", "failed", "timeout"]): | |
continue | |
parts = line.split(', ') | |
if len(parts) == 5: | |
try: | |
gpu_id, util, mem_used, mem_total, util = parts | |
is_free = ( | |
int(util) == 0 and | |
int(mem_used) == 0) | |
gpu_info.append({ | |
"id": gpu_id.strip(), | |
"used_mb": int(mem_used), | |
"total_mb": int(mem_total), | |
"utilization": int(util), | |
"free": is_free | |
}) | |
except ValueError: | |
continue | |
return gpu_info | |
def format_memory(used, total): | |
"""Format memory display with percentage""" | |
if total == 0: | |
return "N/A" | |
percent = (used / total) * 100 | |
return f"{used}/{total} MiB ({percent:.1f}%)" | |
def parse_gpu_status(lines): | |
"""Parse nvidia-smi output to find free GPUs""" | |
free_gpus = [] | |
for line in lines: | |
if not line or any(x in line for x in ["Error", "failed", "timeout"]): | |
continue | |
parts = line.split(', ') | |
if len(parts) == 4: | |
gpu_id, util, mem_used, processes = parts | |
if int(processes) == 0 and int(util) == 0 and int(mem_used) == 0: | |
free_gpus.append(gpu_id.strip()) | |
return free_gpus | |
def main(): | |
print("PowerShell Environment Emulator") | |
print("================================\n") | |
try: | |
plink_path = find_plink() | |
print(f"Found plink.exe at: {plink_path}") | |
except FileNotFoundError as e: | |
print(str(e)) | |
print("\nTroubleshooting steps:") | |
print("1. Install PuTTY from ") | |
print("2. Add PuTTY installation directory to system PATH") | |
print("3. Restart your terminal and Python IDE after installation") | |
sys.exit(1) | |
username, password = get_credentials() | |
report = defaultdict(list) | |
for node in NODES: | |
print(f"\nTesting {node}\n") | |
cmd = [ | |
plink_path, | |
"-ssh", | |
f"{username}@{node}", | |
"-pw", password, | |
"-batch", | |
"nvidia-smi --query-gpu=index,utilization.gpu,memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits" | |
] | |
print(f"Command: {' '.join(cmd[:4] + ['***', '***'] + cmd[5:])}") | |
result = execute_plink(cmd) | |
# print(f"\nExit Code: {result.get('returncode', 'N/A')}") | |
# print(f"Stdout: {result.get('stdout', '')[:200]}") | |
# print(f"Stderr: {result.get('stderr', '')}") | |
if "error" in result: | |
print(f"Execution Error: {result['error']}") | |
# output = check_gpu(plink_path, node, username, password) | |
free = parse_gpu_memory(result.get('stdout', '').strip().split('\n')) | |
if free: | |
report[node] = free | |
else: | |
report[node] = None | |
# Print detailed report | |
print("\nGPU Memory Utilization Report:") | |
print("==============================") | |
for node, gpus in report.items(): | |
print(f"\n{node}:") | |
if not gpus: | |
print(" No GPU information available") | |
continue | |
for gpu in gpus: | |
status = "FREE" if gpu["free"] else "IN USE" | |
mem_str = format_memory(gpu["used_mb"], gpu["total_mb"]) | |
print(f" GPU {gpu['id']}: {mem_str}") | |
print(f" Utilization: {gpu['utilization']}% | Status: {status}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment