Created
January 21, 2012 01:17
-
-
Save tomprince/1650604 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, os | |
from twisted.python import log | |
from twisted.internet import reactor | |
from twisted.internet import defer | |
from twisted.conch.ssh.common import NS | |
from twisted.conch.scripts.cftp import ClientOptions | |
from twisted.conch.ssh.filetransfer import FileTransferClient | |
from twisted.conch.client.connect import connect | |
from twisted.conch.ssh.connection import SSHConnection | |
from twisted.conch.ssh.channel import SSHChannel | |
from twisted.conch.ssh import userauth | |
class SSHUserAuthClient(userauth.SSHUserAuthClient): | |
def __init__(self, user, options, *args): | |
userauth.SSHUserAuthClient.__init__(self, user, *args) | |
self.options = options | |
self._tried_key = False | |
def getPublicKey(self): | |
if self._tried_key: | |
return | |
file = self.options['pubkey'] | |
if not os.path.exists(file): | |
return None | |
try: | |
key = keys.Key.fromFile(file) | |
self._tried_key = True | |
return key | |
except keys.BadKeyError: | |
return None | |
def getPrivateKey(self): | |
file = self.options['privkey'] | |
if not os.path.exists(file): | |
return None | |
try: | |
log.msg(file) | |
log.msg('here.') | |
return defer.succeed(keys.Key.fromFile(file)) | |
except keys.EncryptedKeyError: | |
return defer.fail(ConchError('encrypted private-key')) | |
#def getPassword(self): | |
# return None | |
def verifyHostKey(transport, host, pubKey, fingerprint): | |
known_hosts = transport.factory.options['known-hosts'] | |
if not known_hosts: | |
return defer.succeed(True) | |
actualHost = transport.factory.options['host'] | |
actualKey = keys.Key.fromString(pubKey) | |
kh = KnownHostsFile.fromPath(FilePath(known_hosts)) | |
return (kh.hasHostKey(host, actualKey) or | |
kh.hasHostKey(actualhost, actualKey)) | |
class SFTPSession(SSHChannel): | |
name = 'session' | |
def channelOpen(self, whatever): | |
d = self.conn.sendRequest( | |
self, 'subsystem', NS('sftp'), wantReply=True) | |
d.addCallbacks(self._cbSFTP) | |
def _cbSFTP(self, result): | |
client = FileTransferClient() | |
client.makeConnection(self) | |
self.dataReceived = client.dataReceived | |
self.conn._sftp.callback(client) | |
class SFTPConnection(SSHConnection): | |
def serviceStarted(self): | |
self.openChannel(SFTPSession()) | |
def sftp(user, host, port, pubkey, privkey): | |
options = ClientOptions() | |
options['host'] = host | |
options['port'] = port | |
options['pubkey'] = pubkey | |
options['privkey'] = privkey | |
options | |
conn = SFTPConnection() | |
conn._sftp = defer.Deferred() | |
auth = SSHUserAuthClient(user, options, conn) | |
connect(host, port, options, verifyHostKey, auth) | |
return conn._sftp | |
def transfer(client): | |
#d = client.makeDirectory('foobarbaz', {}) | |
def cbDir(ignored): | |
print 'Made directory' | |
#d.addCallback(cbDir) | |
#return d | |
def main(): | |
log.startLogging(sys.stdout) | |
user = 'gembot' | |
host = 'frs.sourceforge.net' | |
port = 22 | |
privkey = 'gembot-ssh-key' | |
pubkey = 'gembot-ssh-key.pub' | |
d = sftp(user, host, port, pubkey, privkey) | |
d.addCallback(transfer) | |
d.addErrback(log.err, "Problem with SFTP transfer") | |
d.addBoth(lambda ignored: reactor.stop()) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
yanchris
commented
Aug 27, 2024
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment