Usage:
printf {0..100} | ./ssh_wrapper.py echo --cpus 2 --gpus 1 --email -N 2
printf myscript.py" --foo "{0..100} | ./ssh_wrapper.py python --cpus 2 --gpus 1 --email -N 2
Usage:
printf {0..100} | ./ssh_wrapper.py echo --cpus 2 --gpus 1 --email -N 2
printf myscript.py" --foo "{0..100} | ./ssh_wrapper.py python --cpus 2 --gpus 1 --email -N 2
Host | Memory | CPU | GPU | Alive | |
---|---|---|---|---|---|
nebula-1 | 125.6 | 32 | 0 | 1 | |
nebula-2 | 94.1 | 32 | 0 | 1 | |
nebula-3 | 63.1 | 12 | 0 | 1 | |
nebula-4 | 62.8 | 12 | 0 | 1 | |
nebula-5 | 62.8 | 12 | 0 | 1 | |
nebula-6 | 62.8 | 12 | 0 | 1 | |
nebula-7 | 125.7 | 24 | 0 | 1 | |
nebula-8 | 125.7 | 24 | 0 | 0 | |
nebula-9 | 125.7 | 24 | 0 | 1 | |
quasar-20 | 141.9 | 24 | 0 | 1 | |
quasar-21 | 141.9 | 24 | 0 | 0 | |
quasar-22 | 141.9 | 24 | 0 | 1 | |
quasar-27 | 31.3 | 8 | 0 | 0 | |
quasar-28 | 31.5 | 8 | 0 | 0 | |
quasar-30 | 31.3 | 16 | 0 | 0 | |
quasar-31 | 31.3 | 16 | 0 | 1 | |
quasar-32 | 31.5 | 16 | 0 | 1 | |
quasar-33 | 23.4 | 16 | 0 | 1 | |
quasar-34 | 23.4 | 16 | 0 | 1 | |
quasar-37 | 62.7 | 12 | 0 | 0 | |
quasar-38 | 15.5 | 8 | 0 | 0 | |
quasar-39 | 62.7 | 12 | 0 | 1 | |
quasar-40 | 62.7 | 12 | 0 | 1 | |
quasar-41 | 62.7 | 12 | 0 | 0 | |
quasar-42 | 63 | 12 | 0 | 1 | |
quasar-43 | 63 | 12 | 0 | 1 | |
quasar-44 | 62.7 | 12 | 0 | 1 | |
quasar-46 | 126.1 | 16 | 0 | 1 | |
quasar-47 | 62.7 | 12 | 0 | 1 | |
quasar-48 | 62.7 | 12 | 0 | 0 | |
quasar-50 | 126.1 | 24 | 0 | 1 | |
quasar-51 | 126.1 | 24 | 0 | 1 | |
quasar-52 | 126.1 | 24 | 0 | 1 | |
nova-1 | 504 | 80 | 1 | 1 | |
nova-2 | 504 | 80 | 2 | 1 | |
nova-2 | 504 | 80 | 2 | 1 |
#!/bin/python3 | |
import argparse | |
import os | |
import os.path as osp | |
import sys | |
from multiprocessing import Pool, current_process | |
from os.path import expanduser | |
import random | |
from subprocess import check_output | |
import time | |
import warnings | |
import paramiko | |
def get_parser(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('exec', metavar='executable', type=str) | |
parser.add_argument('--name', type=str, default=None, help="Default: None") | |
parser.add_argument('--cpus', type=int, default=1) | |
parser.add_argument('--gpus', type=int, default=0) | |
parser.add_argument('--mem', type=float, default=1, help="Default: 1 (GB)") | |
parser.add_argument( | |
"-N", "--jobs_per_host", type=int, default=1, help="Default: 1") | |
parser.add_argument( | |
"--sleep", | |
type=int, | |
default=0, | |
help="Sleep certain seconds to avoid congestion. Default: 0") | |
parser.add_argument( | |
"--email", action='store_true', help="Enable email notification.") | |
# for SSH connection | |
parser.add_argument( | |
"--pw", | |
metavar='password', | |
type=str, | |
default=None, | |
help="Password. If not set, read it from ~/.parallel_run_pw ") | |
parser.add_argument( | |
"--host_file", | |
type=str, | |
default="~/hosts.tsv", | |
help="Host description file.") | |
return parser | |
def get_hosts(args): | |
hosts = [] | |
with open(expanduser(args.host_file)) as fin: | |
fin.readline() | |
for line in fin: | |
fields = line.strip().split() | |
mem, cpus, gpus, alive = list(map(float, fields[1:])) | |
if (alive and cpus > args.cpus and mem > args.mem | |
and (args.gpus <= gpus)): | |
hosts.append(fields[0]) | |
return hosts | |
def runCommand(cmd, pw): | |
pid = current_process().pid | |
host = hosts[pid % len(hosts)] | |
client = paramiko.SSHClient() | |
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
try: | |
with warnings.catch_warnings(): | |
# NOTE: To ignore CryptographyDeprecationWarning; | |
# remove it in later version of paramiko (2.4.2) | |
warnings.simplefilter("ignore") | |
client.connect(host, password=pw) | |
except Exception: | |
print("***** failed to connect host ", host, " *****", sep='') | |
return False | |
if args.sleep > 0: | |
time.sleep(random.random() * args.sleep * len(hosts)) | |
print("Running on {}: {}".format(host, cmd)) | |
# execute the main commmand | |
stdin, stdout, stderr = client.exec_command( | |
"cd {}; {}".format(os.getcwd(), cmd), get_pty=True) | |
for line in stdout: | |
print("[{}]: {}".format(host, line), end="") | |
client.close() | |
return True | |
if __name__ == "__main__": | |
# generate parameter combination | |
args = get_parser().parse_args() | |
if args.pw is None: | |
with open(expanduser("~/.parallel_run_pw"), 'r') as fin: | |
args.pw = fin.readline().strip() | |
# get hosts | |
hosts = get_hosts(args) | |
hosts = hosts * args.jobs_per_host | |
# read commmand fields | |
exec_path = check_output(["which", args.exec]) | |
args.exec = osp.expanduser(exec_path.decode("utf")).strip() | |
cmds = [] | |
for line in sys.stdin: | |
cmds.append(args.exec + " " + line.strip()) | |
n_jobs = len(cmds) | |
if args.email: | |
text = """---Arguments--- | |
{} | |
---Hosts--- | |
{} | |
---Commands--- | |
{} | |
---Job Name--- | |
{} | |
""".format(" ".join(sys.argv[1:]), hosts, "\n".join(cmds), args.name) | |
send_email(subject="Experiment Notification", text=text + "\nStart.") | |
with Pool(len(hosts)) as p: | |
p.starmap(runCommand, zip(cmds, [args.pw] * n_jobs)) | |
if args.email: | |
send_email( | |
subject="Experiment Notification", text=text + "\nFinished.") |