Last active
September 6, 2023 04:05
-
-
Save o3bvv/e9515a0d9189fc514e34 to your computer and use it in GitHub Desktop.
SSSHHH! Testing Python SSH client with auth via private key
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install our dependencies | |
$ pip install -r <(wget -O- http://bit.ly/ssh_test_requirements) | |
# Generate key pair without password | |
$ ssh-keygen -t rsa -C "user@unicorn" | |
Generating public/private rsa key pair. | |
Enter file in which to save the key (/home/alex/.ssh/id_rsa): user.key | |
Enter passphrase (empty for no passphrase): | |
Enter same passphrase again: | |
Your identification has been saved in user.key. | |
Your public key has been saved in user.key.pub. | |
The key fingerprint is: | |
16:d8:23:ad:ba:00:ac:f2:fd:6e:ae:f6:9b:12:53:16 user@unicorn | |
The key's randomart image is: | |
+--[ RSA 2048]----+ | |
| | | |
| E+ | | |
| o.= | | |
|. oo o | | |
|.. o. S | | |
|.. o. . | | |
|o . .o | | |
|.. oo... | | |
| ..+OB. | | |
+-----------------+ | |
# Download test keys (for lazy people) | |
$ wget -O user.key http://bit.ly/ssh_test_key && chmod 600 user.key | |
$ wget -O user.key.pub http://bit.ly/ssh_test_key_pub | |
# Download mock SSH server and make it runnable | |
$ wget -O mock_server.py http://bit.ly/ssh_mock_server && chmod +x mock_server.py | |
# Run our SSH server | |
$ ./mock_server.py | |
# Add private key to agent | |
$ ssh-add user.key | |
Identity added: user.key (user.key) | |
# Connect to our SSH server | |
$ ssh user@localhost -p 2222 | |
The authenticity of host '[localhost]:2222 ([127.0.0.1]:2222)' can't be established. | |
RSA key fingerprint is 16:d8:23:ad:ba:00:ac:f2:fd:6e:ae:f6:9b:12:53:16. | |
Are you sure you want to continue connecting (yes/no)? yes | |
>>> Welcome to test SSH server! | |
$ Hey! | |
No such command. | |
$ whoami | |
user | |
$ whatdoiwant | |
SSH! | |
$ howdoiwantit | |
Fast and furious! | |
$ quit | |
Connection to localhost closed. | |
# Download tests | |
$ wget -O test_ssh.py http://bit.ly/ssh_tests | |
# Run tests | |
$ nosetests | |
.... | |
---------------------------------------------------------------------- | |
Ran 4 tests in 3.732s | |
OK |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import base64 | |
import os | |
from twisted.conch import avatar, error, recvline | |
from twisted.conch.insults import insults | |
from twisted.conch.interfaces import IConchUser, ISession | |
from twisted.conch.ssh import keys, factory, session | |
from twisted.cred import checkers, credentials, portal | |
from twisted.python import failure | |
from zope.interface import implements | |
class SSHMockProtocol(recvline.HistoricRecvLine): | |
def __init__(self, user): | |
self.user = user | |
def connectionMade(self): | |
recvline.HistoricRecvLine.connectionMade(self) | |
self.terminal.write("Welcome to test SSH server!") | |
self.terminal.nextLine() | |
self.show_prompt() | |
def lineReceived(self, line): | |
line = line.strip() | |
if line: | |
cmd_and_args = line.split() | |
cmd = cmd_and_args[0] | |
args = cmd_and_args[1:] | |
func = self.get_command_handler(cmd) | |
if func: | |
try: | |
func(*args) | |
except Exception, e: | |
self.terminal.write("Error: %s" % e) | |
self.terminal.nextLine() | |
else: | |
self.terminal.write("No such command.") | |
self.terminal.nextLine() | |
self.show_prompt() | |
def show_prompt(self): | |
self.terminal.write("$ ") | |
def get_command_handler(self, cmd): | |
return getattr(self, 'do_' + cmd, None) | |
def do_echo(self, *args): | |
self.terminal.write(" ".join(args)) | |
self.terminal.nextLine() | |
def do_whoami(self): | |
self.terminal.write(self.user.username) | |
self.terminal.nextLine() | |
def do_whatdoiwant(self): | |
self.terminal.write("SSH!") | |
self.terminal.nextLine() | |
def do_howdoiwantit(self): | |
self.terminal.write("Fast and furious!") | |
self.terminal.nextLine() | |
def do_quit(self): | |
self.terminal.write("Good bye!") | |
self.terminal.nextLine() | |
self.terminal.loseConnection() | |
class SSHMockAvatar(avatar.ConchUser): | |
implements(ISession) | |
def __init__(self, username): | |
avatar.ConchUser.__init__(self) | |
self.username = username | |
self.channelLookup.update({'session': session.SSHSession}) | |
def getPty(self, terminal, windowSize, attrs): | |
return None | |
def openShell(self, protocol): | |
server_protocol = insults.ServerProtocol(SSHMockProtocol, self) | |
server_protocol.makeConnection(protocol) | |
protocol.makeConnection(session.wrapProtocol(server_protocol)) | |
def execCommand(self, protocol, cmd): | |
if cmd: | |
self.client = TransportWrapper(protocol) | |
cmd_and_args = cmd.split() | |
cmd, args = cmd_and_args[0], cmd_and_args[1:] | |
func = self.get_exec_func(cmd) | |
if func: | |
try: | |
func(*args) | |
except Exception as e: | |
self.client.write("Error: {0}".format(e)) | |
else: | |
self.client.write("No such command.") | |
self.client.loseConnection() | |
protocol.session.conn.transport.expectedLoseConnection = 1 | |
def get_exec_func(self, cmd): | |
return getattr(self, 'exec_' + cmd, None) | |
def exec_whoami(self): | |
self.client.write(self.username) | |
def exec_echo(self, *args): | |
self.client.write(" ".join(args)) | |
def eofReceived(self): | |
pass | |
def closed(self): | |
pass | |
class TransportWrapper(object): | |
def __init__(self, p): | |
self.protocol = p | |
p.makeConnection(self) | |
self.closed = False | |
def write(self, data): | |
self.protocol.outReceived(data) | |
self.protocol.outReceived('\r\n') | |
# Mimic 'exit' for the shell test | |
if '\x00' in data: | |
self.loseConnection() | |
def loseConnection(self): | |
if self.closed: | |
return | |
self.closed = True | |
self.protocol.inConnectionLost() | |
self.protocol.outConnectionLost() | |
self.protocol.errConnectionLost() | |
class SSHMockRealm(object): | |
implements(portal.IRealm) | |
def requestAvatar(self, avatarId, mind, *interfaces): | |
if IConchUser in interfaces: | |
return interfaces[0], SSHMockAvatar(avatarId), lambda: None | |
else: | |
raise NotImplementedError("No supported interfaces found.") | |
class PublicKeyCredentialsChecker(object): | |
implements(checkers.ICredentialsChecker) | |
credentialInterfaces = (credentials.ISSHPrivateKey, ) | |
def __init__(self, authorizedKeys): | |
self.authorized_keys = authorizedKeys | |
def requestAvatarId(self, credentials): | |
user_key_string = self.authorized_keys.get(credentials.username) | |
if not user_key_string: | |
return failure.Failure(error.ConchError("No such user")) | |
# Remove the 'ssh-rsa' type before decoding | |
blob = base64.decodestring(user_key_string.split(" ")[1]) | |
if credentials.blob != blob: | |
raise failure.failure(error.ConchError("Failed to recognize key")) | |
if not credentials.signature: | |
return failure.Failure(error.ValidPublicKey()) | |
user_key = keys.Key.fromString(data=user_key_string) | |
if user_key.verify(credentials.signature, credentials.sigData): | |
return credentials.username | |
else: | |
return failure.Failure(error.ConchError("Incorrect signature")) | |
def get_public_key(keys_path, username): | |
path = os.path.join(keys_path, "{0}.key.pub".format(username)) | |
with open(path) as blob_file: | |
blob = blob_file.read() | |
key = keys.Key.fromString(data=blob) | |
return key | |
def get_ssh_factory(keys_path, username): | |
ssh_factory = factory.SSHFactory() | |
ssh_factory.portal = portal.Portal(SSHMockRealm()) | |
public_key = get_public_key(keys_path, username) | |
ssh_factory.publicKeys = {"ssh-rsa": public_key} | |
authorized_keys = { | |
username: public_key.toString('OPENSSH') | |
} | |
checker = PublicKeyCredentialsChecker(authorized_keys) | |
ssh_factory.portal.registerChecker(checker) | |
return ssh_factory | |
def start_threaded_server(interface='localhost', port=2222, username='user', | |
keys_path='.'): | |
from threading import Thread | |
from twisted.internet import reactor | |
ssh_factory = get_ssh_factory(keys_path, username) | |
reactor.listenTCP(port, ssh_factory, interface=interface) | |
Thread(target=reactor.run, args=(False, )).start() | |
def stop_threaded_server(): | |
from twisted.internet import reactor | |
reactor.callFromThread(reactor.stop) | |
def main(): | |
import argparse | |
from twisted.internet import reactor | |
parser = argparse.ArgumentParser(description="Run mock SSH server") | |
parser.add_argument( | |
'-i', '--interface', | |
help="Interface to listen incoming connections on. Default: localhost", | |
default="localhost", | |
) | |
parser.add_argument( | |
'-p', '--port', | |
help="Interface to listen incoming connections on. Default: 2222", | |
type=int, | |
default=2222, | |
) | |
parser.add_argument( | |
'-u', '--username', | |
help="Username which is going to connect. Default: user", | |
default="user", | |
) | |
parser.add_argument( | |
'-k', '--keys-path', | |
help="Path to directory, where keys are stored. " | |
"Default: . (current directory)", | |
dest='keys_path', | |
default='.', | |
) | |
args = parser.parse_args() | |
ssh_factory = get_ssh_factory(args.keys_path, args.username) | |
reactor.listenTCP(args.port, ssh_factory, interface=args.interface) | |
reactor.run() | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requirements are freezed to prevent time bombs | |
# For client | |
paramiko==1.15.2 | |
# For server | |
Twisted==15.1.0 | |
pyasn1==0.1.7 | |
# For tests | |
nose==1.3.6 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import paramiko | |
import unittest | |
from mock_server import start_threaded_server, stop_threaded_server | |
__here__ = os.path.abspath(os.path.dirname(__file__)) | |
class SSHTestCase(unittest.TestCase): | |
username = 'user' | |
interface = 'localhost' | |
port = 2222 | |
@classmethod | |
def setUpClass(cls): | |
start_threaded_server(interface=cls.interface, | |
port=cls.port, | |
username=cls.username, | |
keys_path=__here__) | |
@classmethod | |
def tearDownClass(cls): | |
stop_threaded_server() | |
def setUp(self): | |
key_path = os.path.join(__here__, "{0}.key".format(self.username)) | |
rsa_key = paramiko.RSAKey.from_private_key_file(key_path) | |
self.client = paramiko.SSHClient() | |
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
self.client.connect(self.interface, self.port, | |
username=self.username, | |
pkey=rsa_key) | |
def tearDown(self): | |
self.client.close() | |
def test_whoami(self): | |
stdin, stdout, stderr = self.client.exec_command('whoami') | |
output = stdout.read().strip() | |
self.assertEqual(output, self.username) | |
def test_echo(self): | |
stdin, stdout, stderr = self.client.exec_command("echo foo bar baz") | |
output = stdout.read().strip() | |
self.assertEqual(output, "foo bar baz") | |
def test_invalid_arguments(self): | |
stdin, stdout, stderr = self.client.exec_command("whoami today") | |
output = stdout.read().strip() | |
self.assertEqual(output, "Error: exec_whoami() takes exactly 1 argument (2 given)") | |
def test_unknown_command(self): | |
stdin, stdout, stderr = self.client.exec_command("wrong command") | |
output = stdout.read().strip() | |
self.assertEqual(output, "No such command.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-----BEGIN RSA PRIVATE KEY----- | |
MIIEpgIBAAKCAQEA/HlMR0chgFLx6A/BmYi9Ypj9nr0kZ3Wo6n3ZdSXORJaH6e7d | |
LSw+0eoD1NDCxDORZlwYNAg45zndU5sPN4IvVcAvC/FD0qSK1H0ku1p4QV82y4a1 | |
SOnzRlDUIVhdQxnlQa8gI+zDO8AyzOJ4oZ9LL7Hy+mIDqGxRwHeKfDYXuHkE+aM9 | |
CCkidPqR/Uqxag03+y51MEphC07mr7mrDb4lOmeqy8Xq0ZcUjIkmKiGCVVRJO6g4 | |
kYFwr0UwpKCwLzOfw4Fy2SpzWfe3IdVUqIro97d1AGC+9OUQETiVpBYObBI1qfQo | |
ef6MT+CFcTV40cyKdCG6x57f6gpPMYqYfdHTyQIDAQABAoIBAQDkzgTL3/HDeugS | |
WB0qyFphvaazMlSIkn/3qv/lA9MQI5+e6MN3Cc8Qq9S3DE5GQzm1GycwGHeBTdZ/ | |
y1maA5hkTRwV5ZuCjW3nrlYYmJ+9Fs3w2u712leHVP86DPvQMOqsgUpOZGZ2gvNG | |
7MNILbWUzt8V/Le17hyUoYFWmisbGH0UyiRJsUHoAdZFdRq+sDVsFmu49fhcEsLI | |
5B0fB2v5FSwgSuIfYNjPNEJG9xK+qZzjEXd5g7XVs5mbrMflbyOwDDf6/nMILGnO | |
ZvtCagqaH0SaxnEzucsdFdnGBhzGEbPXxDVxuS7kcdWpEN+f0JksYaVaLetCsD3s | |
xudTxI3RAoGBAP85vkIaG2WEurONLURLAiM3dNxP4KXonADqn8RnJ5TNpn4Il9g5 | |
wiuANjja7Cvf6GrvDRl7lrphAQ/c/6iRl6tPv8Hvertd7RXmG9QoLl/qG3ghjNDl | |
WUexvloZ7NXqKq9PSj7vq76cxIfSklAEQ+x1ldalVaS9iR+ZqKryY8plAoGBAP09 | |
atBWJPapFoci7Mrml4N8TNY+n7unD/yyoLabmm0YfPbBy9x9SyCbefVwB5F1dWeV | |
MZc3oG6EMKg/BH/vnKpiNtXk4OubIbqJD3jHrqeT6wAkSItgVnlU6t6FLuKjLXoD | |
Buw3oM/+i2pDy8oSsxueBv1GyMZG0ixcShkeXPuVAoGBAMFFHMo5st1hcXBeTBUX | |
J/s7F4duBZQdXWVkRrAX3WVVheqS30miE2OVp3nObmGbIQk5FRZi/HUO2BsHI6Km | |
/c+AiJl3m90e91ZJ9nDmLJf9U+fYoCXgR4d/FcJtN2eV99ThmjumisvBMyIXVyy4 | |
zibVtC3i7cPes2P2nD83ZlHxAoGBAN+VSCckx4HXjAJH/ZSuvnriVdyaceD2ARF0 | |
jJxtCYzkoAAk3l6PaLMjUiw2exgcAkov2RbPkB/DKkqBSPHDliiAijWS3FpoHwFY | |
XYaflj5yRHtdjYcwyWhaZvuLzvdeZppg7c3E14CMFn792IFSvTvW7AjWZBFbGdj8 | |
qpc+zY15AoGBANvsGFHmurP+VUu1ibegREBySts0WGNyq5VOTCrkN9S7AKR1pZNI | |
I6W7KrRcXEvbBM7B47ykbE40hAachxN1Rpk+9qEom6etTaw/yMewgFNjZXYJw06z | |
4bq6ofjKK8VqCWx41pWcmXj7Fa2A43RvZWg8TlX7Q8uc4wTBpTkuzfZC | |
-----END RSA PRIVATE KEY----- |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQD8eUxHRyGAUvHoD8GZiL1imP2evSRndajqfdl1Jc5Elofp7t0tLD7R6gPU0MLEM5FmXBg0CDjnOd1Tmw83gi9VwC8L8UPSpIrUfSS7WnhBXzbLhrVI6fNGUNQhWF1DGeVBryAj7MM7wDLM4nihn0svsfL6YgOobFHAd4p8Nhe4eQT5oz0IKSJ0+pH9SrFqDTf7LnUwSmELTuavuasNviU6Z6rLxerRlxSMiSYqIYJVVEk7qDiRgXCvRTCkoLAvM5/DgXLZKnNZ97ch1VSoiuj3t3UAYL705RAROJWkFg5sEjWp9Ch5/oxP4IVxNXjRzIp0IbrHnt/qCk8xiph90dPJ user@unicorn |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment