Skip to content

Instantly share code, notes, and snippets.

@gcr
Created October 19, 2015 01:54
Show Gist options
  • Save gcr/e37cb95780294197b191 to your computer and use it in GitHub Desktop.
Save gcr/e37cb95780294197b191 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from IPython.parallel.client import client
import argparse
import sys
import numpy as np
import datetime
import time
REDBULLET = u"\x1b[1;31m\u25cf\x1b[0m".encode('utf-8')
GRNBULLET = u"\x1b[1;32m\u25cf\x1b[0m".encode('utf-8')
YLOBULLET = u"\x1b[1;33m\u25cf\x1b[0m".encode('utf-8')
def main():
try:
rc = client.Client()
except IOError:
try:
rc = client.Client(profile_dir="/mnt/net/parallel")
except:
print "Unknown client folder"
sys.exit(1)
parser = argparse.ArgumentParser(description='Retrieve and print cluster status')
parser.add_argument('-s', '--stdout', help="Whether to tail tasks",
default=False, action='store_true')
parser.add_argument('-c', '--continuous', help="Rerun continuously",
default=False, action='store_true')
args = parser.parse_args()
while True:
groups = list(all_task_groups(rc))
if args.stdout:
tail_output([task for group in groups for task in group], n=10)
print "Cluster Summary: {} engines".format(len(rc))
for group in groups:
print_group_progressbar(group)
if args.continuous:
time.sleep(60)
else:
break
def all_tasks(rc):
for key, status in rc.queue_status(verbose=True).iteritems():
if key == 'unassigned':
for uuid in status:
yield get_metadata(rc, uuid)
else:
for tasklist in status.values():
for uuid in tasklist:
yield get_metadata(rc, uuid)
def get_metadata(rc, uuid):
return rc.db_query({"msg_id": uuid})[0]
def all_task_groups(rc):
task_metadatas = list(all_tasks(rc))
task_metadatas.sort(key=lambda asr: asr['submitted'])
current_group = []
if task_metadatas:
current_task = task_metadatas[0]
for task in task_metadatas:
if (task['submitted'] - current_task['submitted']).total_seconds() > 1.0:
yield current_group
current_group = [task]
else:
current_group.append(task)
current_task = task
yield current_group
def print_group_progressbar(group):
n_total = len(group)
n_completed = sum((1 if task['completed'] else 0)
for task in group)
n_started = sum((1 if task['engine_uuid'] and not task['completed'] else 0)
for task in group)
n_failed = sum((1 if task['pyerr'] else 0) for task in group)
# In these tasks, the engine died before the task could be finished.
n_failed += sum((1 if task['completed'] and not task['started'] else 0)
for task in group)
# average the ten most recent tasks
avg_elapsed = [
(task['completed'] - task['started']).total_seconds()
for task in sorted(group, key=lambda task: task['started'] or datetime.datetime.now())
if task['completed'] and task['started']
][-10:]
avg_time = np.mean(avg_elapsed) if avg_elapsed else 60
latest_time = sorted([datetime.datetime(1900,1,1)]+
[task['completed'] or datetime.datetime(1900,1,1)
for task in group if task['completed']
])
show_progressbar(n_total,
n_completed,
n_started,
n_failed,
avg_time,
latest_time[-1])
def show_progressbar(total,complete,running, failed, avg_time, latest_complete, bar_width=50):
""" Show a nice ASCII progress bar """
if complete == total:
print "{} Group finished on {} {}".format(GRNBULLET,
latest_complete.strftime("%Y-%m-%d %H:%M:%S"),
"(Failed: {})".format(failed) if failed else "")
return
if complete == 0 and running == 0:
print "{} Group not started ({} to go)".format(REDBULLET, total)
return
n="In Progress:"
c="%.2f%% %d/%d" % (float(complete)*100/total, complete,total)
print YLOBULLET+" "+n + " "*max(5, 2+bar_width-len(n)-len(c))+c
# Draw an ASCII progress bar
sys.stdout.write(" [")
for w in xrange(bar_width):
i = w/float(bar_width) * total
if i < complete:
sys.stdout.write("=")
elif complete <= i < (complete+running):
sys.stdout.write(",")
else:
sys.stdout.write(".")
sys.stdout.write("]\n")
eta = ""
if running>0 and complete>0:
seconds = ((total-complete)*avg_time) / running
# hours=minutes=0
# minutes = int(seconds/60)
# hours = int(minutes/60)
# print "ETA %02d:%02d:%02d" % (hours,minutes%60,seconds%60)
eta = "ETA %s" % format_time(seconds)
running = "{} running".format(running)
print " "+eta+(" "*(bar_width+2 - len(eta) - len(running)))+running
if failed:
print " \x1b[1;31mFailed: {}\x1b[0m".format(failed)
def format_time(s):
tstr = "%ds" % (s % 60)
if s >= 60:
m = s/60
tstr = ("%dm "+tstr) % (m % 60)
if m >= 60:
h = m/60
tstr = ("%dh "+tstr) % (h % 24)
if h >= 24:
d = h/24
tstr = ("%dd "+tstr) % d
return tstr
def tail_output(group, n=10):
tasks = [task
for task in group
if task['completed']]
tasks = sorted(tasks, key=lambda task: task['completed']) [-n:]
for task in tasks:
if task['stdout'] or task['stderr'] or task['pyerr'] or task.get('result_content', {}).get('traceback', None):
bullet = GRNBULLET
if task['stderr']:
bullet = YLOBULLET
if task['pyerr'] or task.get('result_content', {}).get('traceback', None):
bullet = REDBULLET
print bullet+" \x1b[1mTask {} {}:\x1b[0m".format(
task['msg_id'],
task['completed'].strftime("%Y-%m-%d %H:%M:%S"),
)
if task['stdout']:
print task['stdout'].strip()
if task['stderr']:
print task['stderr'].strip()
if task['pyerr'] or task.get('result_content', {}).get('traceback', None):
print "\n".join(task['result_content']['traceback'])
print ""
# todo: do fancy things with this!
# predict time to completion?
# tail outputs and stderrs
# also: i *need* a better abort script!! !
# - supervisord will not shut things down like it
# should
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment