-
-
Save troyscott/4283580 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# fabric extension to enable handling expected prompts | |
# | |
# Read more at http://ilogue.com/jasper/blog/fexpect--dealing-with-prompts-in-fabric-with-pexpect/ | |
# | |
# This file Copyright (c) Jasper van den Bosch, ilogue, [email protected] | |
# Pexpect Copyright (c) 2012 Noah Spurrier ,see: http://www.noah.org/wiki/pexpect#License | |
from fabric.state import env | |
import fabric.api | |
import shortuuid | |
from general import resource | |
from StringIO import StringIO | |
def expect(promptexpr, response, exitAfter=-1): | |
if not exitAfter == -1: | |
return [(promptexpr, response, exitAfter)] | |
return [(promptexpr, response)] | |
def expecting(e): | |
return ExpectationContext(e) | |
class ExpectationContext(object): | |
def __init__(self,expectations): | |
self.expectations = expectations | |
def __enter__(self): | |
env.expectations = self.expectations | |
def __exit__(self, type, value, tb): | |
env.expectations = [] | |
def run(cmd): | |
#sudo wrapper | |
wrappedCmd = wrapExpectations(cmd,env) | |
return fabric.api.run(wrappedCmd) | |
def sudo(cmd): | |
#sudo wrapper | |
wrappedCmd = wrapExpectations(cmd,env) | |
return fabric.api.sudo(wrappedCmd) | |
def wrapExpectations(cmd,env): | |
script = createScript(cmd,env) | |
remoteScript = '/tmp/fexpect_'+shortuuid.uuid() | |
fabric.api.put(resource('pexpect.py'),'/tmp/') | |
fabric.api.put(StringIO(script),remoteScript) | |
wrappedCmd = 'python '+remoteScript | |
return wrappedCmd | |
def createScript(cmd,env): | |
to = 30*60 # readline timeout 8 hours | |
#write header: | |
s = '#!/usr/bin/python\n' | |
s+= 'import sys\n' | |
s+= 'from time import sleep\n' | |
s+= 'import pexpect\n' | |
#write expectation list: | |
s+= 'expectations=[' | |
for e in env.expectations: | |
s+= '"{0}",'.format(e[0]) | |
s+= ']\n' | |
#start | |
s+= """child = pexpect.spawn('/bin/bash -c "{0}"',timeout={1})\n""".format(cmd,to) | |
s+= "child.logfile = sys.stdout\n" | |
s+= "while True:\n" | |
s+= "\ttry:\n" | |
s+= "\t\ti = child.expect(expectations)\n" | |
i = 0 | |
for e in env.expectations: | |
ifkeyw = 'if' if i == 0 else 'elif' | |
s+= "\t\t{0} i == {1}:\n".format(ifkeyw,i) | |
s+= "\t\t\tchild.sendline('{0}')\n".format(e[1]) | |
if len(e)>2: | |
s+= "\t\t\tsleep({0})\n".format(e[2]) | |
s+= "\t\t\tprint('Exiting fexpect for expected exit.')\n" | |
s+= '\t\t\tbreak\n' | |
i += 1 | |
s+= '\texcept pexpect.EOF:\n' | |
s+= "\t\tprint('Exiting fexpect for EOF.')\n" | |
s+= '\t\tbreak\n' | |
s+= '\n' | |
return s |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# fabric extension to enable handling expected prompts | |
# | |
# Read more at http://ilogue.com/jasper/blog/fexpect--dealing-with-prompts-in-fabric-with-pexpect/ | |
# | |
# This file Copyright (c) Jasper van den Bosch, ilogue, [email protected] | |
# Pexpect Copyright (c) 2012 Noah Spurrier ,see: http://www.noah.org/wiki/pexpect#License | |
# Run from a fabfile.py with: | |
#@task() | |
#def test(): | |
# from tests import FexpectTests, runtest | |
# runtest(FexpectTests) | |
import unittest | |
from fabric.api import * | |
def runtest(testclass): | |
suite = unittest.TestLoader().loadTestsFromTestCase(testclass) | |
unittest.TextTestRunner(verbosity=2).run(suite) | |
class FexpectTests(unittest.TestCase): | |
def test_one_expectation(self): | |
cmd = 'echo "Hello" && read NAME && echo "Hi $NAME."' | |
from fexpect import expect, expecting, run | |
expectation = expect('Hello','answer') | |
with expecting(expectation): | |
output = run(cmd) | |
self.assertIn('answer',output) | |
def test_two_expectations(self): | |
cmd = 'echo "Hello" && read ONE && echo "bladiebla" && read TWO && echo "First $ONE than $TWO."' | |
from fexpect import expect, expecting, run | |
exp1 = expect('Hello','111') | |
exp2 = expect('bladiebla','222') | |
with expecting(exp1+exp2): | |
output = run(cmd) | |
self.assertIn('111',output) | |
self.assertIn('222',output) | |
def test_order_inconsequential(self): | |
#sequence shouldn't matter | |
cmd = 'echo "Hello" && read ONE && echo "bladiebla" && read TWO && echo "First $ONE than $TWO."' | |
from fexpect import expect, expecting, run | |
exp1 = expect('Hello','111') | |
exp2 = expect('bladiebla','222') | |
with expecting(exp2+exp1): | |
output = run(cmd) | |
self.assertIn('111',output) | |
self.assertIn('222',output) | |
def test_exit_after_expectation(self): | |
import time | |
from StringIO import StringIO | |
#sequence shouldn't matter | |
script = "#!/usr/bin/python\nimport time\nfor i in range(1,8):\n\tprint(i)\n\ttime.sleep(1)" | |
cmd = 'python /tmp/test.py' | |
put(StringIO(script),'/tmp/test.py') | |
from fexpect import expect, expecting, run | |
exp1 = expect('Hello','111') | |
exp2 = expect('3','expected',exitAfter=0) | |
t = time.time() | |
with expecting(exp1+exp2): | |
output = run(cmd) | |
elapsed = time.time() - t | |
self.assertGreater(elapsed,2) | |
self.assertLess(elapsed,4) | |
def tryOrFailOnPrompt(self,method,args): | |
try: | |
with settings(abort_on_prompts=True): | |
result = method(*args) | |
except SystemExit as promptAbort: | |
self.fail("There was an unexpected (password) prompt.") | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment