Created
October 19, 2015 01:54
-
-
Save gcr/e37cb95780294197b191 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from IPython.parallel.client import client | |
import argparse | |
import sys | |
import numpy as np | |
import datetime | |
import time | |
REDBULLET = u"\x1b[1;31m\u25cf\x1b[0m".encode('utf-8') | |
GRNBULLET = u"\x1b[1;32m\u25cf\x1b[0m".encode('utf-8') | |
YLOBULLET = u"\x1b[1;33m\u25cf\x1b[0m".encode('utf-8') | |
def main(): | |
try: | |
rc = client.Client() | |
except IOError: | |
try: | |
rc = client.Client(profile_dir="/mnt/net/parallel") | |
except: | |
print "Unknown client folder" | |
sys.exit(1) | |
parser = argparse.ArgumentParser(description='Retrieve and print cluster status') | |
parser.add_argument('-s', '--stdout', help="Whether to tail tasks", | |
default=False, action='store_true') | |
parser.add_argument('-c', '--continuous', help="Rerun continuously", | |
default=False, action='store_true') | |
args = parser.parse_args() | |
while True: | |
groups = list(all_task_groups(rc)) | |
if args.stdout: | |
tail_output([task for group in groups for task in group], n=10) | |
print "Cluster Summary: {} engines".format(len(rc)) | |
for group in groups: | |
print_group_progressbar(group) | |
if args.continuous: | |
time.sleep(60) | |
else: | |
break | |
def all_tasks(rc): | |
for key, status in rc.queue_status(verbose=True).iteritems(): | |
if key == 'unassigned': | |
for uuid in status: | |
yield get_metadata(rc, uuid) | |
else: | |
for tasklist in status.values(): | |
for uuid in tasklist: | |
yield get_metadata(rc, uuid) | |
def get_metadata(rc, uuid): | |
return rc.db_query({"msg_id": uuid})[0] | |
def all_task_groups(rc): | |
task_metadatas = list(all_tasks(rc)) | |
task_metadatas.sort(key=lambda asr: asr['submitted']) | |
current_group = [] | |
if task_metadatas: | |
current_task = task_metadatas[0] | |
for task in task_metadatas: | |
if (task['submitted'] - current_task['submitted']).total_seconds() > 1.0: | |
yield current_group | |
current_group = [task] | |
else: | |
current_group.append(task) | |
current_task = task | |
yield current_group | |
def print_group_progressbar(group): | |
n_total = len(group) | |
n_completed = sum((1 if task['completed'] else 0) | |
for task in group) | |
n_started = sum((1 if task['engine_uuid'] and not task['completed'] else 0) | |
for task in group) | |
n_failed = sum((1 if task['pyerr'] else 0) for task in group) | |
# In these tasks, the engine died before the task could be finished. | |
n_failed += sum((1 if task['completed'] and not task['started'] else 0) | |
for task in group) | |
# average the ten most recent tasks | |
avg_elapsed = [ | |
(task['completed'] - task['started']).total_seconds() | |
for task in sorted(group, key=lambda task: task['started'] or datetime.datetime.now()) | |
if task['completed'] and task['started'] | |
][-10:] | |
avg_time = np.mean(avg_elapsed) if avg_elapsed else 60 | |
latest_time = sorted([datetime.datetime(1900,1,1)]+ | |
[task['completed'] or datetime.datetime(1900,1,1) | |
for task in group if task['completed'] | |
]) | |
show_progressbar(n_total, | |
n_completed, | |
n_started, | |
n_failed, | |
avg_time, | |
latest_time[-1]) | |
def show_progressbar(total,complete,running, failed, avg_time, latest_complete, bar_width=50): | |
""" Show a nice ASCII progress bar """ | |
if complete == total: | |
print "{} Group finished on {} {}".format(GRNBULLET, | |
latest_complete.strftime("%Y-%m-%d %H:%M:%S"), | |
"(Failed: {})".format(failed) if failed else "") | |
return | |
if complete == 0 and running == 0: | |
print "{} Group not started ({} to go)".format(REDBULLET, total) | |
return | |
n="In Progress:" | |
c="%.2f%% %d/%d" % (float(complete)*100/total, complete,total) | |
print YLOBULLET+" "+n + " "*max(5, 2+bar_width-len(n)-len(c))+c | |
# Draw an ASCII progress bar | |
sys.stdout.write(" [") | |
for w in xrange(bar_width): | |
i = w/float(bar_width) * total | |
if i < complete: | |
sys.stdout.write("=") | |
elif complete <= i < (complete+running): | |
sys.stdout.write(",") | |
else: | |
sys.stdout.write(".") | |
sys.stdout.write("]\n") | |
eta = "" | |
if running>0 and complete>0: | |
seconds = ((total-complete)*avg_time) / running | |
# hours=minutes=0 | |
# minutes = int(seconds/60) | |
# hours = int(minutes/60) | |
# print "ETA %02d:%02d:%02d" % (hours,minutes%60,seconds%60) | |
eta = "ETA %s" % format_time(seconds) | |
running = "{} running".format(running) | |
print " "+eta+(" "*(bar_width+2 - len(eta) - len(running)))+running | |
if failed: | |
print " \x1b[1;31mFailed: {}\x1b[0m".format(failed) | |
def format_time(s): | |
tstr = "%ds" % (s % 60) | |
if s >= 60: | |
m = s/60 | |
tstr = ("%dm "+tstr) % (m % 60) | |
if m >= 60: | |
h = m/60 | |
tstr = ("%dh "+tstr) % (h % 24) | |
if h >= 24: | |
d = h/24 | |
tstr = ("%dd "+tstr) % d | |
return tstr | |
def tail_output(group, n=10): | |
tasks = [task | |
for task in group | |
if task['completed']] | |
tasks = sorted(tasks, key=lambda task: task['completed']) [-n:] | |
for task in tasks: | |
if task['stdout'] or task['stderr'] or task['pyerr'] or task.get('result_content', {}).get('traceback', None): | |
bullet = GRNBULLET | |
if task['stderr']: | |
bullet = YLOBULLET | |
if task['pyerr'] or task.get('result_content', {}).get('traceback', None): | |
bullet = REDBULLET | |
print bullet+" \x1b[1mTask {} {}:\x1b[0m".format( | |
task['msg_id'], | |
task['completed'].strftime("%Y-%m-%d %H:%M:%S"), | |
) | |
if task['stdout']: | |
print task['stdout'].strip() | |
if task['stderr']: | |
print task['stderr'].strip() | |
if task['pyerr'] or task.get('result_content', {}).get('traceback', None): | |
print "\n".join(task['result_content']['traceback']) | |
print "" | |
# todo: do fancy things with this! | |
# predict time to completion? | |
# tail outputs and stderrs | |
# also: i *need* a better abort script!! ! | |
# - supervisord will not shut things down like it | |
# should | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment