Skip to content

Instantly share code, notes, and snippets.

@dbr
Created October 15, 2013 02:44
Show Gist options
  • Save dbr/6985734 to your computer and use it in GitHub Desktop.
Save dbr/6985734 to your computer and use it in GitHub Desktop.
Test of running Nuke in a psuedoterminal to capture progress info
"""Test of running Nuke in a psuedoterminal, in order to capture the
7% done, Time to go: 17 minutes, 53 seconds
..output
"""
raise Exception("This probably doesn't work very reliably, barely tested")
import os
import sys
import pty
import subprocess
def run_in_pty(cmd, handle_stdout="supress"):
if handle_stdout not in [
"supress", # Hide stderr
"untouched", # Let stderr go to output as usual
"debug", # Output stderr as usual, but show repr() of it, with "stderr:" prefix
]:
raise ValueError("Invalid option for handle_stdout arg")
# Create psuedoterminals for stdout and stderr
stdout_master, stdout_slave = pty.openpty()
stderr_master, stderr_slave = pty.openpty()
# Create process
p = subprocess.Popen(cmd,
stdout=stdout_slave,
stderr=stderr_slave,
)
# Create Python file objects for terminal
bufsize=0
stdout = os.fdopen(stdout_master, 'rb', bufsize)
stderr = os.fdopen(stderr_master, 'rb', bufsize)
# Begin reading output
import select, errno
read_set = [stdout_master, stderr_master]
while read_set:
try:
ret = select.select(read_set, [], [])
except select.error, e:
# EINTER means try again
if e.args[0] == errno.EINTR:
continue
else:
raise
# Got data from stdout or stderr
for fd in ret[0]:
if stdout is not None and not stdout.closed and fd == stdout.fileno():
read = stdout.read(1)#line()
if read == "": # EOF (a blank line would be "\n")
# Stream complete
read_set.remove(stdout_master)
stdout.close()
else:
# New output to stdOUT.
#sys.stdout.write("stdout: %r\n" % read)
yield read
elif not stderr.closed and fd == stderr.fileno():
read = stderr.readline()
if read == "":
# Stream complete
read_set.remove(stderr_master)
stderr.close()
else:
# New output to stdERR
if handle_stdout == "supress":
pass
elif handle_stdout == "untouched":
sys.stderr.write(read)
elif handle_stdout == "debug":
sys.stderr.write("stderr: %r\n" % read)
# End of output, wait for process to terminate
p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError("Process return exit code %s" % p.returncode,
p.returncode,
None)
def split_by_cr(iterable):
"""Loops over the iterable, yielding lines denoted by the
carridge-return "\r" instead of newline "\n"
"""
linebuf = []
for x in iterable:
if x == "\r":
# Carridge return
yield "".join(linebuf)
linebuf = []
# Check if next character is a newline, if so, ignore it
n = next(iterable)
if n == "\n":
pass
else:
linebuf.append(n)
else:
linebuf.append(x)
# Crude progress parser
import re
prog_matcher = re.compile(r".*(\d+)% done, ") # ' 1% done, Time to go: 45 seconds '
for line in split_by_cr(run_in_pty(sys.argv[1:])):
m = prog_matcher.match(line)
if m:
print "%d%%" % (int(m.group(1)))
elif "Writing" in line:
print line
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment