Skip to content

Instantly share code, notes, and snippets.

@tomprince
Created January 21, 2012 01:17
Show Gist options
  • Save tomprince/1650604 to your computer and use it in GitHub Desktop.
Save tomprince/1650604 to your computer and use it in GitHub Desktop.
import sys, os
from twisted.python import log
from twisted.internet import reactor
from twisted.internet import defer
from twisted.conch.ssh.common import NS
from twisted.conch.scripts.cftp import ClientOptions
from twisted.conch.ssh.filetransfer import FileTransferClient
from twisted.conch.client.connect import connect
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.channel import SSHChannel
from twisted.conch.ssh import userauth
class SSHUserAuthClient(userauth.SSHUserAuthClient):
def __init__(self, user, options, *args):
userauth.SSHUserAuthClient.__init__(self, user, *args)
self.options = options
self._tried_key = False
def getPublicKey(self):
if self._tried_key:
return
file = self.options['pubkey']
if not os.path.exists(file):
return None
try:
key = keys.Key.fromFile(file)
self._tried_key = True
return key
except keys.BadKeyError:
return None
def getPrivateKey(self):
file = self.options['privkey']
if not os.path.exists(file):
return None
try:
log.msg(file)
log.msg('here.')
return defer.succeed(keys.Key.fromFile(file))
except keys.EncryptedKeyError:
return defer.fail(ConchError('encrypted private-key'))
#def getPassword(self):
# return None
def verifyHostKey(transport, host, pubKey, fingerprint):
known_hosts = transport.factory.options['known-hosts']
if not known_hosts:
return defer.succeed(True)
actualHost = transport.factory.options['host']
actualKey = keys.Key.fromString(pubKey)
kh = KnownHostsFile.fromPath(FilePath(known_hosts))
return (kh.hasHostKey(host, actualKey) or
kh.hasHostKey(actualhost, actualKey))
class SFTPSession(SSHChannel):
name = 'session'
def channelOpen(self, whatever):
d = self.conn.sendRequest(
self, 'subsystem', NS('sftp'), wantReply=True)
d.addCallbacks(self._cbSFTP)
def _cbSFTP(self, result):
client = FileTransferClient()
client.makeConnection(self)
self.dataReceived = client.dataReceived
self.conn._sftp.callback(client)
class SFTPConnection(SSHConnection):
def serviceStarted(self):
self.openChannel(SFTPSession())
def sftp(user, host, port, pubkey, privkey):
options = ClientOptions()
options['host'] = host
options['port'] = port
options['pubkey'] = pubkey
options['privkey'] = privkey
options
conn = SFTPConnection()
conn._sftp = defer.Deferred()
auth = SSHUserAuthClient(user, options, conn)
connect(host, port, options, verifyHostKey, auth)
return conn._sftp
def transfer(client):
#d = client.makeDirectory('foobarbaz', {})
def cbDir(ignored):
print 'Made directory'
#d.addCallback(cbDir)
#return d
def main():
log.startLogging(sys.stdout)
user = 'gembot'
host = 'frs.sourceforge.net'
port = 22
privkey = 'gembot-ssh-key'
pubkey = 'gembot-ssh-key.pub'
d = sftp(user, host, port, pubkey, privkey)
d.addCallback(transfer)
d.addErrback(log.err, "Problem with SFTP transfer")
d.addBoth(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
@evilaliv3
Copy link

@tomprince: have you ever completed this prototype? we may be interested in moving it forward to use it inside @globaleaks

\cc @fpietrosanti

@wd16yuan
Copy link

hi, should import "from twisted.conch.ssh import keys" , please note that indent 61 line "class SFTPSession(SSHChannel):"

@Fabian1337
Copy link

@tomprince do you have any newer version of this ? this would be nice :)

@yanchris
Copy link

yanchris commented Aug 27, 2024

import os
from sys import stdout

from twisted.python.log import startLogging, err, msg as log_msg
from twisted.internet import reactor
from twisted.internet.defer import Deferred, succeed
from twisted.conch.ssh import keys
from twisted.conch.ssh.common import NS
from twisted.conch.scripts.cftp import ClientOptions
from twisted.conch.ssh.filetransfer import FileTransferClient
from twisted.conch.client.connect import connect
from twisted.conch.client.default import SSHUserAuthClient, verifyHostKey
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.channel import SSHChannel
from twisted.conch.ssh.filetransfer import FXF_READ


class SFTPSession(SSHChannel):
    name = 'session'

    def channelOpen(self, whatever):
        d = self.conn.sendRequest(self, 'subsystem', NS('sftp'), wantReply=True)
        d.addCallbacks(self._cbSFTP)

    def _cbSFTP(self, result):
        client = FileTransferClient()
        client.makeConnection(self)
        self.dataReceived = client.dataReceived
        self.conn._sftp.callback(client)


class SFTPConnection(SSHConnection):
    def serviceStarted(self):
        self.openChannel(SFTPSession())

class AuthClient(SSHUserAuthClient):
    def __init__(self, user, options, *args):
        SSHUserAuthClient.__init__(self, user, options, *args)

    def getPublicKey(self):
        log_msg('getPublicKey')

        kfile = self.options['pubKey']
        if not os.path.exists(kfile):
            return None
        else:
            self.usedFiles.append(kfile)
            return keys.Key.fromFile(kfile)

    def getPrivateKey(self):
        log_msg('getPrivateKey')

        kfile = self.options['privateKey']
        if not os.path.exists(kfile):
            return None
        else:
            return succeed(keys.Key.fromFile(kfile))

def sftp(user, host, port, pubKey, privateKey):
    options = ClientOptions()
    options['host'] = host
    options['port'] = port
    options['pubKey'] = pubKey
    options['privateKey'] = privateKey
    conn = SFTPConnection()
    conn._sftp = Deferred()
    auth = AuthClient(user, options, conn)
    connect(host, port, options, verifyHostKey, auth)
    return conn._sftp


def transfer(client):
    #d = client.makeDirectory('new_dir', {})
    #def cbDir(ignored):
    #    print('Made directory')
    #d.addCallback(cbDir)

    #d = client.openDirectory('/tmp/user')
    #def ls_cb(cd):
    #    log_msg('ls_cb')
    #    d = cd.read()
    #    def cb(data):
    #        log_msg('in cb')
    #        for f, *_ in data:
    #            print(f)
    #    d.addCallback(cb)
    #
    #    return d

    #d.addCallback(ls_cb)

    #d = client.openFile('/tmp/user/data.tar.bz2', FXF_READ, {})
    d = client.openFile('/tmp/user/data11.tar.bz2', FXF_READ, {})
    fp = open('/tmp/user/data11.tar.bz2', 'wb')
    block_size = 2**16
    offset = 0

    def cb(cf):
        log_msg(cf)

        def eof(err):
            log_msg(f'eof cb: {err.args}')
            fp.close()

        def read_cb(data):
            nonlocal offset
            data_len = len(data)
            log_msg(f'offset: {offset}, data lengh: {data_len}')
            fp.write(data)

            offset += data_len
            return cf.readChunk(offset, block_size).addCallback(read_cb).addErrback(eof)

        return cf.readChunk(offset, block_size).addCallback(read_cb).addErrback(eof)

    d.addCallback(cb)

    return d


def main():
    startLogging(stdout)

    user = 'xxx'
    host = 'localhost'
    port = 22
    pubKey = '/tmp/user/ssh-key/key.pub'
    privateKey = '/tmp/user/ssh-key/key'
    d = sftp(user, host, port, pubKey, privateKey)
    d.addCallback(transfer)
    d.addErrback(err, "Problem with SFTP transfer")
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment