Last active
January 9, 2018 06:08
-
-
Save teeberg/5dc13b1f0b229d8ba9e8e5ba98c3214c to your computer and use it in GitHub Desktop.
`eb ssh` wrapper to connect to Elastic Beanstalk while verifying SSH host keys using the instance's console output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function eb() { | |
if [[ $# > 0 && $1 == "ssh" ]]; then | |
"$(which eb)" ssh -e "$(which ebssh.py)" "${@:2}" | |
else | |
"$(which eb)" "$@" | |
fi | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/Users/jonas/.venvs/~bin/bin/python3 | |
# coding=utf-8 | |
import errno | |
import json | |
import re | |
import subprocess | |
import sys | |
from functools import wraps | |
from os import makedirs as os_makedirs | |
from os.path import dirname, exists, isdir | |
from pprint import pformat | |
import boto3 | |
import yaml | |
from ebcli.operations.sshops import _get_ssh_file | |
with open('.elasticbeanstalk/config.yml') as fd: | |
eb_data = yaml.load(fd) | |
session = boto3.Session(profile_name=eb_data['global']['profile'], | |
region_name=eb_data['global']['default_region']) | |
ec2 = session.client('ec2') | |
def main(args): | |
ip_address = args[-1].split('@')[1] | |
instance_data = json.loads(get_instance_data(ip_address)) | |
instance_id, keypair_name = instance_data | |
expected_fingerprints = aws_get_host_key_fingerprints(ip_address, instance_id) | |
hash_algorithm = next(iter(expected_fingerprints.values())).split(':')[0].lower() | |
actual_fingerprints = ssh_get_host_key_fingerprints(ip_address, hash_algorithm) | |
matching_fingerprints = dict( | |
(key, value) | |
for key, value | |
in expected_fingerprints.items() | |
if value in actual_fingerprints | |
) | |
if not matching_fingerprints: | |
raise RuntimeError( | |
'Did not find any matching fingerprints!\n' | |
'Expected: {}\n'.format(pformat(sorted(expected_fingerprints.values()))) + | |
'Actual: {}'.format(pformat(sorted(actual_fingerprints)))) | |
print('{} matching fingerprints'.format(len(matching_fingerprints))) | |
cmd = [ | |
'ssh', | |
'-i', | |
_get_ssh_file(keypair_name), | |
'-o', | |
'UserKnownHostsFile={}'.format(ssh_keyscan.cached_file_path_template.format(ip_address)) | |
] + args | |
print(cmd) | |
p = subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stderr, stdin=sys.stdin) | |
p.communicate() | |
return p.returncode | |
def local(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE): | |
print(cmd) | |
p = subprocess.Popen(cmd, stdout=stdout, stderr=stderr) | |
stdout, stderr = p.communicate() | |
if stdout is not None: | |
stdout = stdout.decode('utf-8') | |
if stderr is not None: | |
stderr = stderr.decode('utf-8') | |
if p.returncode != 0: | |
raise RuntimeError('Command failed: {}\n'.format(cmd) + | |
'Stdout: {}\n'.format(stdout) + | |
'Stderr: {}\n'.format(stderr)) | |
return stdout, stderr | |
def cached_host_file(filename): | |
def wrapper(f): | |
@wraps(f) | |
def inner(ip_address, *args, **kwargs): | |
filepath = inner.cached_file_path_template.format(ip_address) | |
makedirs(dirname(filepath)) | |
if exists(filepath): | |
with open(filepath, 'r') as fd: | |
contents = fd.read() | |
return contents | |
result = f(ip_address, *args, **kwargs) | |
with open(filepath, 'w') as fd: | |
fd.write(result) | |
return result | |
inner.cached_file_path_template = '/tmp/.ebssh/{{}}/{}'.format(filename) | |
return inner | |
return wrapper | |
def makedirs(name, mode=None): | |
try: | |
if mode is None: | |
os_makedirs(name) | |
else: | |
os_makedirs(name, mode) | |
except OSError as e: | |
# be happy if someone already created the path | |
if e.errno != errno.EEXIST: | |
raise | |
# but only if it's actually a directory | |
if not isdir(name): | |
raise | |
@cached_host_file('ssh-keyscan.txt') | |
def ssh_keyscan(ip_address): | |
print('Looking up SSH keys for host {}'.format(ip_address)) | |
return local(['ssh-keyscan', '-H', ip_address])[0] | |
@cached_host_file('instance-data.txt') | |
def get_instance_data(ip_address): | |
print('Looking up instance ID for IP address {}...'.format(ip_address)) | |
response = ec2.describe_instances( | |
Filters=[ | |
{ | |
'Name': 'ip-address', | |
'Values': [ip_address], | |
} | |
] | |
) | |
reservations = response['Reservations'] | |
assert len(reservations) == 1, reservations | |
instances = reservations[0]['Instances'] | |
assert len(instances) == 1, instances | |
instance = instances[0] | |
return json.dumps([ | |
instance['InstanceId'], | |
instance['KeyName'] | |
]) | |
@cached_host_file('console-output.txt') | |
def get_console_output(ip_address, instance_id): | |
return ec2.get_console_output(InstanceId=instance_id)['Output'] | |
def aws_get_host_key_fingerprints(ip_address, instance_id): | |
console_output = get_console_output(ip_address, instance_id) | |
pattern = ( | |
r"Generating public/private ([^\s]+) key pair.[\r\n]+" | |
r"Your identification has been saved in /etc/ssh/[^\s]+\.[\r\n]+" | |
r"Your public key has been saved in /etc/ssh/[^\s]+\.pub\.\r?[\r\n]+" | |
r"The key fingerprint is:[\r\n]+" | |
r"([^\s]+) [^\s]+" | |
) | |
fingerprints = re.findall(pattern, console_output) | |
assert fingerprints, ('Could not find fingerprints, check {}' | |
.format(get_console_output.cached_file_path_template.format(ip_address))) | |
print('... {} fingerprints'.format(len(fingerprints))) | |
return dict( | |
(algo, 'MD5:{}'.format(fp) if not fp.startswith('SHA256') else fp) | |
for algo, fp in fingerprints | |
) | |
@cached_host_file('ssh-keygen.txt') | |
def ssh_keygen(ip_address, hash_algorithm): | |
host_keys_file = ssh_keyscan.cached_file_path_template.format(ip_address) | |
return local(['ssh-keygen', '-E', hash_algorithm, '-lf', host_keys_file])[0] | |
def ssh_get_host_key_fingerprints(ip_address, hash_algorithm='md5'): | |
ssh_keyscan(ip_address) | |
fingerprint_lines = ssh_keygen(ip_address, hash_algorithm).splitlines() | |
actual_fingerprints = [ | |
line.split(' ')[1] for line in | |
fingerprint_lines | |
] | |
return actual_fingerprints | |
if __name__ == '__main__': | |
exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment