Skip to content

Instantly share code, notes, and snippets.

@o3bvv
Last active September 6, 2023 04:05
Show Gist options
  • Save o3bvv/e9515a0d9189fc514e34 to your computer and use it in GitHub Desktop.
Save o3bvv/e9515a0d9189fc514e34 to your computer and use it in GitHub Desktop.
SSSHHH! Testing Python SSH client with auth via private key
# Install our dependencies
$ pip install -r <(wget -O- http://bit.ly/ssh_test_requirements)
# Generate key pair without password
$ ssh-keygen -t rsa -C "user@unicorn"
Generating public/private rsa key pair.
Enter file in which to save the key (/home/alex/.ssh/id_rsa): user.key
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in user.key.
Your public key has been saved in user.key.pub.
The key fingerprint is:
16:d8:23:ad:ba:00:ac:f2:fd:6e:ae:f6:9b:12:53:16 user@unicorn
The key's randomart image is:
+--[ RSA 2048]----+
| |
| E+ |
| o.= |
|. oo o |
|.. o. S |
|.. o. . |
|o . .o |
|.. oo... |
| ..+OB. |
+-----------------+
# Download test keys (for lazy people)
$ wget -O user.key http://bit.ly/ssh_test_key && chmod 600 user.key
$ wget -O user.key.pub http://bit.ly/ssh_test_key_pub
# Download mock SSH server and make it runnable
$ wget -O mock_server.py http://bit.ly/ssh_mock_server && chmod +x mock_server.py
# Run our SSH server
$ ./mock_server.py
# Add private key to agent
$ ssh-add user.key
Identity added: user.key (user.key)
# Connect to our SSH server
$ ssh user@localhost -p 2222
The authenticity of host '[localhost]:2222 ([127.0.0.1]:2222)' can't be established.
RSA key fingerprint is 16:d8:23:ad:ba:00:ac:f2:fd:6e:ae:f6:9b:12:53:16.
Are you sure you want to continue connecting (yes/no)? yes
>>> Welcome to test SSH server!
$ Hey!
No such command.
$ whoami
user
$ whatdoiwant
SSH!
$ howdoiwantit
Fast and furious!
$ quit
Connection to localhost closed.
# Download tests
$ wget -O test_ssh.py http://bit.ly/ssh_tests
# Run tests
$ nosetests
....
----------------------------------------------------------------------
Ran 4 tests in 3.732s
OK
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import base64
import os
from twisted.conch import avatar, error, recvline
from twisted.conch.insults import insults
from twisted.conch.interfaces import IConchUser, ISession
from twisted.conch.ssh import keys, factory, session
from twisted.cred import checkers, credentials, portal
from twisted.python import failure
from zope.interface import implements
class SSHMockProtocol(recvline.HistoricRecvLine):
def __init__(self, user):
self.user = user
def connectionMade(self):
recvline.HistoricRecvLine.connectionMade(self)
self.terminal.write("Welcome to test SSH server!")
self.terminal.nextLine()
self.show_prompt()
def lineReceived(self, line):
line = line.strip()
if line:
cmd_and_args = line.split()
cmd = cmd_and_args[0]
args = cmd_and_args[1:]
func = self.get_command_handler(cmd)
if func:
try:
func(*args)
except Exception, e:
self.terminal.write("Error: %s" % e)
self.terminal.nextLine()
else:
self.terminal.write("No such command.")
self.terminal.nextLine()
self.show_prompt()
def show_prompt(self):
self.terminal.write("$ ")
def get_command_handler(self, cmd):
return getattr(self, 'do_' + cmd, None)
def do_echo(self, *args):
self.terminal.write(" ".join(args))
self.terminal.nextLine()
def do_whoami(self):
self.terminal.write(self.user.username)
self.terminal.nextLine()
def do_whatdoiwant(self):
self.terminal.write("SSH!")
self.terminal.nextLine()
def do_howdoiwantit(self):
self.terminal.write("Fast and furious!")
self.terminal.nextLine()
def do_quit(self):
self.terminal.write("Good bye!")
self.terminal.nextLine()
self.terminal.loseConnection()
class SSHMockAvatar(avatar.ConchUser):
implements(ISession)
def __init__(self, username):
avatar.ConchUser.__init__(self)
self.username = username
self.channelLookup.update({'session': session.SSHSession})
def getPty(self, terminal, windowSize, attrs):
return None
def openShell(self, protocol):
server_protocol = insults.ServerProtocol(SSHMockProtocol, self)
server_protocol.makeConnection(protocol)
protocol.makeConnection(session.wrapProtocol(server_protocol))
def execCommand(self, protocol, cmd):
if cmd:
self.client = TransportWrapper(protocol)
cmd_and_args = cmd.split()
cmd, args = cmd_and_args[0], cmd_and_args[1:]
func = self.get_exec_func(cmd)
if func:
try:
func(*args)
except Exception as e:
self.client.write("Error: {0}".format(e))
else:
self.client.write("No such command.")
self.client.loseConnection()
protocol.session.conn.transport.expectedLoseConnection = 1
def get_exec_func(self, cmd):
return getattr(self, 'exec_' + cmd, None)
def exec_whoami(self):
self.client.write(self.username)
def exec_echo(self, *args):
self.client.write(" ".join(args))
def eofReceived(self):
pass
def closed(self):
pass
class TransportWrapper(object):
def __init__(self, p):
self.protocol = p
p.makeConnection(self)
self.closed = False
def write(self, data):
self.protocol.outReceived(data)
self.protocol.outReceived('\r\n')
# Mimic 'exit' for the shell test
if '\x00' in data:
self.loseConnection()
def loseConnection(self):
if self.closed:
return
self.closed = True
self.protocol.inConnectionLost()
self.protocol.outConnectionLost()
self.protocol.errConnectionLost()
class SSHMockRealm(object):
implements(portal.IRealm)
def requestAvatar(self, avatarId, mind, *interfaces):
if IConchUser in interfaces:
return interfaces[0], SSHMockAvatar(avatarId), lambda: None
else:
raise NotImplementedError("No supported interfaces found.")
class PublicKeyCredentialsChecker(object):
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.ISSHPrivateKey, )
def __init__(self, authorizedKeys):
self.authorized_keys = authorizedKeys
def requestAvatarId(self, credentials):
user_key_string = self.authorized_keys.get(credentials.username)
if not user_key_string:
return failure.Failure(error.ConchError("No such user"))
# Remove the 'ssh-rsa' type before decoding
blob = base64.decodestring(user_key_string.split(" ")[1])
if credentials.blob != blob:
raise failure.failure(error.ConchError("Failed to recognize key"))
if not credentials.signature:
return failure.Failure(error.ValidPublicKey())
user_key = keys.Key.fromString(data=user_key_string)
if user_key.verify(credentials.signature, credentials.sigData):
return credentials.username
else:
return failure.Failure(error.ConchError("Incorrect signature"))
def get_public_key(keys_path, username):
path = os.path.join(keys_path, "{0}.key.pub".format(username))
with open(path) as blob_file:
blob = blob_file.read()
key = keys.Key.fromString(data=blob)
return key
def get_ssh_factory(keys_path, username):
ssh_factory = factory.SSHFactory()
ssh_factory.portal = portal.Portal(SSHMockRealm())
public_key = get_public_key(keys_path, username)
ssh_factory.publicKeys = {"ssh-rsa": public_key}
authorized_keys = {
username: public_key.toString('OPENSSH')
}
checker = PublicKeyCredentialsChecker(authorized_keys)
ssh_factory.portal.registerChecker(checker)
return ssh_factory
def start_threaded_server(interface='localhost', port=2222, username='user',
keys_path='.'):
from threading import Thread
from twisted.internet import reactor
ssh_factory = get_ssh_factory(keys_path, username)
reactor.listenTCP(port, ssh_factory, interface=interface)
Thread(target=reactor.run, args=(False, )).start()
def stop_threaded_server():
from twisted.internet import reactor
reactor.callFromThread(reactor.stop)
def main():
import argparse
from twisted.internet import reactor
parser = argparse.ArgumentParser(description="Run mock SSH server")
parser.add_argument(
'-i', '--interface',
help="Interface to listen incoming connections on. Default: localhost",
default="localhost",
)
parser.add_argument(
'-p', '--port',
help="Interface to listen incoming connections on. Default: 2222",
type=int,
default=2222,
)
parser.add_argument(
'-u', '--username',
help="Username which is going to connect. Default: user",
default="user",
)
parser.add_argument(
'-k', '--keys-path',
help="Path to directory, where keys are stored. "
"Default: . (current directory)",
dest='keys_path',
default='.',
)
args = parser.parse_args()
ssh_factory = get_ssh_factory(args.keys_path, args.username)
reactor.listenTCP(args.port, ssh_factory, interface=args.interface)
reactor.run()
if __name__ == '__main__':
main()
# Requirements are freezed to prevent time bombs
# For client
paramiko==1.15.2
# For server
Twisted==15.1.0
pyasn1==0.1.7
# For tests
nose==1.3.6
# -*- coding: utf-8 -*-
import os
import paramiko
import unittest
from mock_server import start_threaded_server, stop_threaded_server
__here__ = os.path.abspath(os.path.dirname(__file__))
class SSHTestCase(unittest.TestCase):
username = 'user'
interface = 'localhost'
port = 2222
@classmethod
def setUpClass(cls):
start_threaded_server(interface=cls.interface,
port=cls.port,
username=cls.username,
keys_path=__here__)
@classmethod
def tearDownClass(cls):
stop_threaded_server()
def setUp(self):
key_path = os.path.join(__here__, "{0}.key".format(self.username))
rsa_key = paramiko.RSAKey.from_private_key_file(key_path)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.interface, self.port,
username=self.username,
pkey=rsa_key)
def tearDown(self):
self.client.close()
def test_whoami(self):
stdin, stdout, stderr = self.client.exec_command('whoami')
output = stdout.read().strip()
self.assertEqual(output, self.username)
def test_echo(self):
stdin, stdout, stderr = self.client.exec_command("echo foo bar baz")
output = stdout.read().strip()
self.assertEqual(output, "foo bar baz")
def test_invalid_arguments(self):
stdin, stdout, stderr = self.client.exec_command("whoami today")
output = stdout.read().strip()
self.assertEqual(output, "Error: exec_whoami() takes exactly 1 argument (2 given)")
def test_unknown_command(self):
stdin, stdout, stderr = self.client.exec_command("wrong command")
output = stdout.read().strip()
self.assertEqual(output, "No such command.")
-----BEGIN RSA PRIVATE KEY-----
MIIEpgIBAAKCAQEA/HlMR0chgFLx6A/BmYi9Ypj9nr0kZ3Wo6n3ZdSXORJaH6e7d
LSw+0eoD1NDCxDORZlwYNAg45zndU5sPN4IvVcAvC/FD0qSK1H0ku1p4QV82y4a1
SOnzRlDUIVhdQxnlQa8gI+zDO8AyzOJ4oZ9LL7Hy+mIDqGxRwHeKfDYXuHkE+aM9
CCkidPqR/Uqxag03+y51MEphC07mr7mrDb4lOmeqy8Xq0ZcUjIkmKiGCVVRJO6g4
kYFwr0UwpKCwLzOfw4Fy2SpzWfe3IdVUqIro97d1AGC+9OUQETiVpBYObBI1qfQo
ef6MT+CFcTV40cyKdCG6x57f6gpPMYqYfdHTyQIDAQABAoIBAQDkzgTL3/HDeugS
WB0qyFphvaazMlSIkn/3qv/lA9MQI5+e6MN3Cc8Qq9S3DE5GQzm1GycwGHeBTdZ/
y1maA5hkTRwV5ZuCjW3nrlYYmJ+9Fs3w2u712leHVP86DPvQMOqsgUpOZGZ2gvNG
7MNILbWUzt8V/Le17hyUoYFWmisbGH0UyiRJsUHoAdZFdRq+sDVsFmu49fhcEsLI
5B0fB2v5FSwgSuIfYNjPNEJG9xK+qZzjEXd5g7XVs5mbrMflbyOwDDf6/nMILGnO
ZvtCagqaH0SaxnEzucsdFdnGBhzGEbPXxDVxuS7kcdWpEN+f0JksYaVaLetCsD3s
xudTxI3RAoGBAP85vkIaG2WEurONLURLAiM3dNxP4KXonADqn8RnJ5TNpn4Il9g5
wiuANjja7Cvf6GrvDRl7lrphAQ/c/6iRl6tPv8Hvertd7RXmG9QoLl/qG3ghjNDl
WUexvloZ7NXqKq9PSj7vq76cxIfSklAEQ+x1ldalVaS9iR+ZqKryY8plAoGBAP09
atBWJPapFoci7Mrml4N8TNY+n7unD/yyoLabmm0YfPbBy9x9SyCbefVwB5F1dWeV
MZc3oG6EMKg/BH/vnKpiNtXk4OubIbqJD3jHrqeT6wAkSItgVnlU6t6FLuKjLXoD
Buw3oM/+i2pDy8oSsxueBv1GyMZG0ixcShkeXPuVAoGBAMFFHMo5st1hcXBeTBUX
J/s7F4duBZQdXWVkRrAX3WVVheqS30miE2OVp3nObmGbIQk5FRZi/HUO2BsHI6Km
/c+AiJl3m90e91ZJ9nDmLJf9U+fYoCXgR4d/FcJtN2eV99ThmjumisvBMyIXVyy4
zibVtC3i7cPes2P2nD83ZlHxAoGBAN+VSCckx4HXjAJH/ZSuvnriVdyaceD2ARF0
jJxtCYzkoAAk3l6PaLMjUiw2exgcAkov2RbPkB/DKkqBSPHDliiAijWS3FpoHwFY
XYaflj5yRHtdjYcwyWhaZvuLzvdeZppg7c3E14CMFn792IFSvTvW7AjWZBFbGdj8
qpc+zY15AoGBANvsGFHmurP+VUu1ibegREBySts0WGNyq5VOTCrkN9S7AKR1pZNI
I6W7KrRcXEvbBM7B47ykbE40hAachxN1Rpk+9qEom6etTaw/yMewgFNjZXYJw06z
4bq6ofjKK8VqCWx41pWcmXj7Fa2A43RvZWg8TlX7Q8uc4wTBpTkuzfZC
-----END RSA PRIVATE KEY-----
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD8eUxHRyGAUvHoD8GZiL1imP2evSRndajqfdl1Jc5Elofp7t0tLD7R6gPU0MLEM5FmXBg0CDjnOd1Tmw83gi9VwC8L8UPSpIrUfSS7WnhBXzbLhrVI6fNGUNQhWF1DGeVBryAj7MM7wDLM4nihn0svsfL6YgOobFHAd4p8Nhe4eQT5oz0IKSJ0+pH9SrFqDTf7LnUwSmELTuavuasNviU6Z6rLxerRlxSMiSYqIYJVVEk7qDiRgXCvRTCkoLAvM5/DgXLZKnNZ97ch1VSoiuj3t3UAYL705RAROJWkFg5sEjWp9Ch5/oxP4IVxNXjRzIp0IbrHnt/qCk8xiph90dPJ user@unicorn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment