Skip to content

Instantly share code, notes, and snippets.

@tcuthbert
Created March 31, 2016 11:01
Show Gist options
  • Select an option

  • Save tcuthbert/abe6d3a7733b98c6dcb0569167a3bdce to your computer and use it in GitHub Desktop.

Select an option

Save tcuthbert/abe6d3a7733b98c6dcb0569167a3bdce to your computer and use it in GitHub Desktop.
import sys
from twisted.conch import error
from twisted.conch.ssh import transport
from twisted.internet import defer
from twisted.conch.ssh import keys, userauth
from twisted.conch.ssh import connection
from twisted.conch.ssh import channel, common
from twisted.python import log
# these are the public/private keys from test_conch
publicKey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3\
/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHR\
ivcJSkbh/C+BR3utDS555mV'
privateKey = """-----BEGIN RSA PRIVATE KEY-----
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
-----END RSA PRIVATE KEY-----"""
class CatChannel(channel.SSHChannel):
name = 'session'
def channelOpen(self, data):
d = self.conn.sendRequest(self, 'shell', '',
wantReply = 1)
d.addCallback(self._cbSendRequest)
self.catData = ''
def _cbSendRequest(self, ignored):
commands = [b'terminal length 0\n',
b'show version\n',
b'exit\n']
for command in commands:
self.write(command)
# self.conn.sendEOF(self)
# self.loseConnection()
def dataReceived(self, data):
self.catData += data
if "R1>exit" in self.catData:
self.loseConnection()
reactor.stop()
def closed(self):
print 'We got this from "cat":', self.catData
class ClientConnection(connection.SSHConnection):
def serviceStarted(self):
self.openChannel(CatChannel(conn = self))
class ClientUserAuth(userauth.SSHUserAuthClient):
def getPassword(self, prompt = None):
#normal password authentication
print "PASSWORD AUTH"
return defer.succeed('cisco') # <-- YOUR PASSWORD
# def getPublicKey(self):
# return keys.Key.fromString(data = publicKey).blob()
# def getPrivateKey(self):
# return defer.succeed(keys.Key.fromString(data = privateKey).keyObject)
class ClientTransport(transport.SSHClientTransport):
def verifyHostKey(self, pubKey, fingerprint):
return defer.succeed(1)
def connectionSecure(self):
self.requestService(ClientUserAuth('cisco', ClientConnection()))
from twisted.internet import protocol, reactor
def main():
log.startLogging(sys.stdout, setStdout=0)
factory = protocol.ClientFactory()
factory.protocol = ClientTransport
reactor.connectTCP('r1.demo.local', 22, factory)
reactor.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment