Skip to content

Instantly share code, notes, and snippets.

@blink1073
Last active August 29, 2015 14:09
Show Gist options
  • Save blink1073/9a0ea82efc84cb9216d0 to your computer and use it in GitHub Desktop.
Save blink1073/9a0ea82efc84cb9216d0 to your computer and use it in GitHub Desktop.
Windows Pexpect (no PTY)
import subprocess
import os
import re
import threading
import Queue
import sys
import signal
class spawn(object):
def __init__(self, cmd, **kwargs):
kwargs.update(dict(bufsize=0, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE))
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
self.proc = subprocess.Popen(cmd, **kwargs)
self._buf = ''
self._read_queue = Queue.Queue()
self._read_thread = threading.Thread(target=self._read_incoming)
self._read_thread.setDaemon(True)
self._read_thread.start()
def read_nonblocking(self, n):
orig = len(self._buf)
while 1:
try:
self._buf += self._read_queue.get_nowait()
except Queue.Empty:
return
else:
if len(self._buf) - orig >= n:
return
def _read_incoming(self):
while 1:
buf = os.read(self.proc.stdout.fileno(), 1024)
self._read_queue.put(buf)
def readline(self):
while not '\n' in self._buf:
self.read_nonblocking(1024)
ind = self._buf.index('\n')
ret, self._buf = self._buf[:ind], self._buf[ind:]
return ret
def write(self, message):
"""Write a message to the process using utf-8 encoding"""
self.proc.stdin.write(message.encode('utf-8'))
def sendline(self, line):
self.write(line + '\n')
def expect(self, strings):
"""Look for a string or strings in the incoming data"""
if not isinstance(strings, list):
strings = [strings]
while 1:
if self._buf:
for string in strings:
match = re.search(string, self._buf)
if match:
self.before = self._buf[:match.start()]
self.after = self._buf[match.start(): match.end()]
self._buf = self._buf[match.end():]
return self.before
self.read_nonblocking(1024)
PY3 = (sys.version_info[0] >= 3)
if PY3:
def u(s): return s
else:
def u(s): return s.decode('utf-8')
PEXPECT_PROMPT = u('[PEXPECT_PROMPT>')
PEXPECT_CONTINUATION_PROMPT = u('[PEXPECT_PROMPT+')
class REPLWrapper(object):
"""Wrapper for a REPL.
:param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn`
in which a REPL has already been started, or a str command to start a new
REPL process.
:param str orig_prompt: The prompt to expect at first.
:param str prompt_change: A command to change the prompt to something more
unique. If this is ``None``, the prompt will not be changed. This will
be formatted with the new and continuation prompts as positional
parameters, so you can use ``{}`` style formatting to insert them into
the command.
:param str new_prompt: The more unique prompt to expect after the change.
:param str prompt_cmd: Command to generate the prompt.
:param str extra_init_cmd: Commands to do extra initialisation, such as
disabling pagers.
"""
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change,
new_prompt=PEXPECT_PROMPT,
continuation_prompt=PEXPECT_CONTINUATION_PROMPT,
prompt_cmd=None,
extra_init_cmd=None):
if isinstance(cmd_or_spawn, str):
self.child = spawn(cmd_or_spawn)
else:
self.child = cmd_or_spawn
self.prompt_cmd = prompt_cmd
if prompt_change is None:
self.prompt = orig_prompt
elif not prompt_cmd:
self.set_prompt(orig_prompt,
prompt_change.format(new_prompt, continuation_prompt))
self.prompt = new_prompt
self.continuation_prompt = continuation_prompt
self._expect_prompt()
if extra_init_cmd is not None:
self.run_command(extra_init_cmd)
def set_prompt(self, orig_prompt, prompt_change):
self.child.expect(orig_prompt)
self.child.sendline(prompt_change)
def _expect_prompt(self, timeout=-1):
if self.prompt_cmd:
self.child.sendline(self.prompt_cmd)
return self.child.expect_exact([self.prompt, self.continuation_prompt],
timeout=timeout)
def run_command(self, command, timeout=-1):
"""Send a command to the REPL, wait for and return output.
:param str command: The command to send. Trailing newlines are not needed.
This should be a complete block of input that will trigger execution;
if a continuation prompt is found after sending input, :exc:`ValueError`
will be raised.
:param int timeout: How long to wait for the next prompt. -1 means the
default from the :class:`pexpect.spawn` object (default 30 seconds).
None means to wait indefinitely.
"""
# Split up multiline commands and feed them in bit-by-bit
cmdlines = command.splitlines()
# splitlines ignores trailing newlines - add it back in manually
if command.endswith('\n'):
cmdlines.append('')
if not cmdlines:
raise ValueError("No command was given")
self.child.sendline(cmdlines[0])
for line in cmdlines[1:]:
self._expect_prompt(timeout=1)
self.child.sendline(line)
# Command was fully submitted, now wait for the next prompt
if self._expect_prompt(timeout=timeout) == 1:
# We got the continuation prompt - command was incomplete
self.child.kill(signal.SIGINT)
self._expect_prompt(timeout=1)
raise ValueError("Continuation prompt found - input was incomplete:\n"
+ command)
return self.child.before
if __name__ == '__main__':
prompt = r'C:\\.*>'
p = spawn('cmd')
p.expect(prompt)
p.write('echo %PATH%\n')
print(p.expect(prompt))
p.write('dir\n')
print(p.expect(prompt))
r = REPLWrapper('cmd', new_prompt='__repl_ready__',
prompt_cmd='echo __repl_ready__')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment