Created
October 15, 2013 02:44
-
-
Save dbr/6985734 to your computer and use it in GitHub Desktop.
Test of running Nuke in a psuedoterminal to capture progress info
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Test of running Nuke in a psuedoterminal, in order to capture the | |
7% done, Time to go: 17 minutes, 53 seconds | |
..output | |
""" | |
raise Exception("This probably doesn't work very reliably, barely tested") | |
import os | |
import sys | |
import pty | |
import subprocess | |
def run_in_pty(cmd, handle_stdout="supress"): | |
if handle_stdout not in [ | |
"supress", # Hide stderr | |
"untouched", # Let stderr go to output as usual | |
"debug", # Output stderr as usual, but show repr() of it, with "stderr:" prefix | |
]: | |
raise ValueError("Invalid option for handle_stdout arg") | |
# Create psuedoterminals for stdout and stderr | |
stdout_master, stdout_slave = pty.openpty() | |
stderr_master, stderr_slave = pty.openpty() | |
# Create process | |
p = subprocess.Popen(cmd, | |
stdout=stdout_slave, | |
stderr=stderr_slave, | |
) | |
# Create Python file objects for terminal | |
bufsize=0 | |
stdout = os.fdopen(stdout_master, 'rb', bufsize) | |
stderr = os.fdopen(stderr_master, 'rb', bufsize) | |
# Begin reading output | |
import select, errno | |
read_set = [stdout_master, stderr_master] | |
while read_set: | |
try: | |
ret = select.select(read_set, [], []) | |
except select.error, e: | |
# EINTER means try again | |
if e.args[0] == errno.EINTR: | |
continue | |
else: | |
raise | |
# Got data from stdout or stderr | |
for fd in ret[0]: | |
if stdout is not None and not stdout.closed and fd == stdout.fileno(): | |
read = stdout.read(1)#line() | |
if read == "": # EOF (a blank line would be "\n") | |
# Stream complete | |
read_set.remove(stdout_master) | |
stdout.close() | |
else: | |
# New output to stdOUT. | |
#sys.stdout.write("stdout: %r\n" % read) | |
yield read | |
elif not stderr.closed and fd == stderr.fileno(): | |
read = stderr.readline() | |
if read == "": | |
# Stream complete | |
read_set.remove(stderr_master) | |
stderr.close() | |
else: | |
# New output to stdERR | |
if handle_stdout == "supress": | |
pass | |
elif handle_stdout == "untouched": | |
sys.stderr.write(read) | |
elif handle_stdout == "debug": | |
sys.stderr.write("stderr: %r\n" % read) | |
# End of output, wait for process to terminate | |
p.wait() | |
if p.returncode != 0: | |
raise subprocess.CalledProcessError("Process return exit code %s" % p.returncode, | |
p.returncode, | |
None) | |
def split_by_cr(iterable): | |
"""Loops over the iterable, yielding lines denoted by the | |
carridge-return "\r" instead of newline "\n" | |
""" | |
linebuf = [] | |
for x in iterable: | |
if x == "\r": | |
# Carridge return | |
yield "".join(linebuf) | |
linebuf = [] | |
# Check if next character is a newline, if so, ignore it | |
n = next(iterable) | |
if n == "\n": | |
pass | |
else: | |
linebuf.append(n) | |
else: | |
linebuf.append(x) | |
# Crude progress parser | |
import re | |
prog_matcher = re.compile(r".*(\d+)% done, ") # ' 1% done, Time to go: 45 seconds ' | |
for line in split_by_cr(run_in_pty(sys.argv[1:])): | |
m = prog_matcher.match(line) | |
if m: | |
print "%d%%" % (int(m.group(1))) | |
elif "Writing" in line: | |
print line |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment