Created
January 17, 2011 17:59
-
-
Save sergray/783164 to your computer and use it in GitHub Desktop.
Extend arbitrary interactive unix command with readline features
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import subprocess | |
import readline | |
import select | |
import os | |
class Popen(subprocess.Popen): | |
def _communicate(self, input): | |
read_set = [] | |
write_set = [] | |
stdout = None # Return | |
stderr = None # Return | |
if self.stdin: | |
# Flush stdio buffer. This might block, if the user has | |
# been writing to .stdin in an uncontrolled fashion. | |
self.stdin.flush() | |
if input: | |
write_set.append(self.stdin) | |
else: | |
self.stdin.close() | |
if self.stdout: | |
read_set.append(self.stdout) | |
stdout = [] | |
if self.stderr: | |
read_set.append(self.stderr) | |
stderr = [] | |
input_offset = 0 | |
while read_set or write_set: | |
try: | |
rlist, wlist, xlist = select.select(read_set, write_set, [], 0.1) | |
except select.error, e: | |
if e.args[0] == errno.EINTR: | |
continue | |
raise | |
if not (rlist or wlist or xlist): | |
break | |
if self.stdin in wlist: | |
# When select has indicated that the file is writable, | |
# we can write up to PIPE_BUF bytes without risk | |
# blocking. POSIX defines PIPE_BUF >= 512 | |
chunk = input[input_offset : input_offset + 512] | |
bytes_written = os.write(self.stdin.fileno(), chunk) | |
input_offset += bytes_written | |
if input_offset >= len(input): | |
write_set.remove(self.stdin) | |
if self.stdout in rlist: | |
data = os.read(self.stdout.fileno(), 1024) | |
if data == "": | |
read_set.remove(self.stdout) | |
stdout.append(data) | |
if self.stderr in rlist: | |
data = os.read(self.stderr.fileno(), 1024) | |
if data == "": | |
read_set.remove(self.stderr) | |
stderr.append(data) | |
# All data exchanged. Translate lists into strings. | |
if stdout is not None: | |
stdout = ''.join(stdout) | |
if stderr is not None: | |
stderr = ''.join(stderr) | |
# Translate newlines, if requested. We cannot let the file | |
# object do the translation: It is based on stdio, which is | |
# impossible to combine with select (unless forcing no | |
# buffering). | |
if self.universal_newlines and hasattr(file, 'newlines'): | |
if stdout: | |
stdout = self._translate_newlines(stdout) | |
if stderr: | |
stderr = self._translate_newlines(stderr) | |
return (stdout, stderr) | |
prompt = '> ' | |
try: | |
proc = Popen(sys.argv[1:], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
except ValueError, e: | |
print "Please check your arguments" | |
sys.exit() | |
else: | |
hist_fname = ".%s" % sys.argv[1] | |
try: | |
readline.read_history_file(hist_fname) | |
except: | |
pass | |
out, err = proc.communicate("\n") | |
while True: | |
if out: print out.lstrip(prompt) | |
if err: print err | |
proc.poll() | |
if proc.returncode is not None: | |
break | |
try: | |
cmd = raw_input(prompt) | |
except (KeyboardInterrupt, EOFError): | |
proc.terminate() | |
break | |
else: | |
out, err = proc.communicate(cmd+"\n") | |
readline.write_history_file(hist_fname) | |
sys.exit(proc.returncode) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment