Last active
August 29, 2015 14:09
-
-
Save blink1073/9a0ea82efc84cb9216d0 to your computer and use it in GitHub Desktop.
Windows Pexpect (no PTY)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import os | |
import re | |
import threading | |
import Queue | |
import sys | |
import signal | |
class spawn(object): | |
def __init__(self, cmd, **kwargs): | |
kwargs.update(dict(bufsize=0, stdin=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
stdout=subprocess.PIPE)) | |
startupinfo = subprocess.STARTUPINFO() | |
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW | |
kwargs['startupinfo'] = startupinfo | |
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP | |
self.proc = subprocess.Popen(cmd, **kwargs) | |
self._buf = '' | |
self._read_queue = Queue.Queue() | |
self._read_thread = threading.Thread(target=self._read_incoming) | |
self._read_thread.setDaemon(True) | |
self._read_thread.start() | |
def read_nonblocking(self, n): | |
orig = len(self._buf) | |
while 1: | |
try: | |
self._buf += self._read_queue.get_nowait() | |
except Queue.Empty: | |
return | |
else: | |
if len(self._buf) - orig >= n: | |
return | |
def _read_incoming(self): | |
while 1: | |
buf = os.read(self.proc.stdout.fileno(), 1024) | |
self._read_queue.put(buf) | |
def readline(self): | |
while not '\n' in self._buf: | |
self.read_nonblocking(1024) | |
ind = self._buf.index('\n') | |
ret, self._buf = self._buf[:ind], self._buf[ind:] | |
return ret | |
def write(self, message): | |
"""Write a message to the process using utf-8 encoding""" | |
self.proc.stdin.write(message.encode('utf-8')) | |
def sendline(self, line): | |
self.write(line + '\n') | |
def expect(self, strings): | |
"""Look for a string or strings in the incoming data""" | |
if not isinstance(strings, list): | |
strings = [strings] | |
while 1: | |
if self._buf: | |
for string in strings: | |
match = re.search(string, self._buf) | |
if match: | |
self.before = self._buf[:match.start()] | |
self.after = self._buf[match.start(): match.end()] | |
self._buf = self._buf[match.end():] | |
return self.before | |
self.read_nonblocking(1024) | |
PY3 = (sys.version_info[0] >= 3) | |
if PY3: | |
def u(s): return s | |
else: | |
def u(s): return s.decode('utf-8') | |
PEXPECT_PROMPT = u('[PEXPECT_PROMPT>') | |
PEXPECT_CONTINUATION_PROMPT = u('[PEXPECT_PROMPT+') | |
class REPLWrapper(object): | |
"""Wrapper for a REPL. | |
:param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn` | |
in which a REPL has already been started, or a str command to start a new | |
REPL process. | |
:param str orig_prompt: The prompt to expect at first. | |
:param str prompt_change: A command to change the prompt to something more | |
unique. If this is ``None``, the prompt will not be changed. This will | |
be formatted with the new and continuation prompts as positional | |
parameters, so you can use ``{}`` style formatting to insert them into | |
the command. | |
:param str new_prompt: The more unique prompt to expect after the change. | |
:param str prompt_cmd: Command to generate the prompt. | |
:param str extra_init_cmd: Commands to do extra initialisation, such as | |
disabling pagers. | |
""" | |
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, | |
new_prompt=PEXPECT_PROMPT, | |
continuation_prompt=PEXPECT_CONTINUATION_PROMPT, | |
prompt_cmd=None, | |
extra_init_cmd=None): | |
if isinstance(cmd_or_spawn, str): | |
self.child = spawn(cmd_or_spawn) | |
else: | |
self.child = cmd_or_spawn | |
self.prompt_cmd = prompt_cmd | |
if prompt_change is None: | |
self.prompt = orig_prompt | |
elif not prompt_cmd: | |
self.set_prompt(orig_prompt, | |
prompt_change.format(new_prompt, continuation_prompt)) | |
self.prompt = new_prompt | |
self.continuation_prompt = continuation_prompt | |
self._expect_prompt() | |
if extra_init_cmd is not None: | |
self.run_command(extra_init_cmd) | |
def set_prompt(self, orig_prompt, prompt_change): | |
self.child.expect(orig_prompt) | |
self.child.sendline(prompt_change) | |
def _expect_prompt(self, timeout=-1): | |
if self.prompt_cmd: | |
self.child.sendline(self.prompt_cmd) | |
return self.child.expect_exact([self.prompt, self.continuation_prompt], | |
timeout=timeout) | |
def run_command(self, command, timeout=-1): | |
"""Send a command to the REPL, wait for and return output. | |
:param str command: The command to send. Trailing newlines are not needed. | |
This should be a complete block of input that will trigger execution; | |
if a continuation prompt is found after sending input, :exc:`ValueError` | |
will be raised. | |
:param int timeout: How long to wait for the next prompt. -1 means the | |
default from the :class:`pexpect.spawn` object (default 30 seconds). | |
None means to wait indefinitely. | |
""" | |
# Split up multiline commands and feed them in bit-by-bit | |
cmdlines = command.splitlines() | |
# splitlines ignores trailing newlines - add it back in manually | |
if command.endswith('\n'): | |
cmdlines.append('') | |
if not cmdlines: | |
raise ValueError("No command was given") | |
self.child.sendline(cmdlines[0]) | |
for line in cmdlines[1:]: | |
self._expect_prompt(timeout=1) | |
self.child.sendline(line) | |
# Command was fully submitted, now wait for the next prompt | |
if self._expect_prompt(timeout=timeout) == 1: | |
# We got the continuation prompt - command was incomplete | |
self.child.kill(signal.SIGINT) | |
self._expect_prompt(timeout=1) | |
raise ValueError("Continuation prompt found - input was incomplete:\n" | |
+ command) | |
return self.child.before | |
if __name__ == '__main__': | |
prompt = r'C:\\.*>' | |
p = spawn('cmd') | |
p.expect(prompt) | |
p.write('echo %PATH%\n') | |
print(p.expect(prompt)) | |
p.write('dir\n') | |
print(p.expect(prompt)) | |
r = REPLWrapper('cmd', new_prompt='__repl_ready__', | |
prompt_cmd='echo __repl_ready__') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment