Skip to content

Instantly share code, notes, and snippets.

@flupke
Last active August 29, 2015 14:04
Show Gist options
  • Save flupke/d1b62471e7782e412b1a to your computer and use it in GitHub Desktop.
Save flupke/d1b62471e7782e412b1a to your computer and use it in GitHub Desktop.
#!py
import pprint
import json
import binascii
import functools
import traceback
import os
import os.path as op
import sqlite3
import time
import logging
import M2Crypto
import requests
from salt.utils import smtp
from salt.utils.odict import OrderedDict
from salt.cloud import CloudClient
from salt.key import Key
logger = logging.getLogger(__name__)
WAIT_TIMEOUT = 60 * 10
def send_mail(subject, content):
'''
Send an email with saltstack.
'''
msg_kwargs = {
'smtp.subject': subject,
'smtp.content': '%s\r\n' % content,
}
smtp.send(msg_kwargs, __opts__)
def send_mail_on_error(func):
'''
A decorator sending exceptions by email.
'''
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
send_mail('Error in EC2 Autoscale Reactor: %s' % exc,
traceback.format_exc())
raise
return wrapper
@send_mail_on_error
def run():
'''
Run the reactor
'''
sns = data['post']
if 'SubscribeURL' in sns:
send_mail('EC2 Autoscale Subscription (via Salt Reactor)',
pprint.pformat(sns))
return {}
url_check = sns['SigningCertURL'].replace('https://', '')
url_comps = url_check.split('/')
if not url_comps[0].endswith('.amazonaws.com'):
# The expected URL does not seem to come from Amazon, do not try to
# process it
send_mail('EC2 Autoscale SigningCertURL Error (via Salt Reactor)',
'There was an error with the EC2 SigningCertURL. '
'\r\n{1} \r\n{2} \r\n'
'Content received was:\r\n\r\n{0}'.format(
pprint.pformat(sns), url_check, url_comps[0]))
return {}
if not 'Subject' in sns:
sns['Subject'] = ''
pem_request = requests.request('GET', sns['SigningCertURL'])
pem = pem_request.text
str_to_sign = (
'Message\n{Message}\n'
'MessageId\n{MessageId}\n'
'Subject\n{Subject}\n'
'Timestamp\n{Timestamp}\n'
'TopicArn\n{TopicArn}\n'
'Type\n{Type}\n'
).format(**sns)
cert = M2Crypto.X509.load_cert_string(str(pem))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
pubkey.verify_update(str_to_sign.encode())
decoded = binascii.a2b_base64(sns['Signature'])
result = pubkey.verify_final(decoded)
if result != 1:
send_mail('EC2 Autoscale Signature Error (via Salt Reactor)',
'There was an error with the EC2 Signature. '
'Content received was:\r\n\r\n{0}'.format(pprint.pformat(sns)))
return {}
message = json.loads(sns['Message'])
instance_id = str(message['EC2InstanceId'])
# Create minion names database
db = InstancesNamesDatabase(__opts__['ec2.autoscale']['db_location'])
if 'launch' in sns['Subject']:
logger.warning('%s launching', instance_id)
# Retrieve instance data and get minion name
client = CloudClient(op.join(op.dirname(__opts__['conf_file']), 'cloud'))
instance_data = client.action('show_instance', names=[instance_id])
provider = __opts__['ec2.autoscale']['provider']
minion_name = instance_data[provider]['ec2'][instance_id]['privateDnsName'].partition('.')[0]
logger.warning('%s name should be %s', instance_id, minion_name)
# Wait for the minion to register
skey = Key(__opts__)
start_time = time.time()
while True:
keys = skey.list_keys()
if minion_name in keys['minions']:
# Minion is already accepted, do nothing
logger.warning('minion already accepted')
return {}
elif time.time() - start_time > WAIT_TIMEOUT:
message = '%s did not show up after %ss' % (minion_name,
WAIT_TIMEOUT)
logger.warning(message)
send_mail('EC2 Autoscale timeout', message)
return {}
elif minion_name in keys['minions_pre']:
break
time.sleep(1)
end_time = time.time()
logger.warning('waited for %s %s seconds', minion_name,
end_time - start_time)
# Store insance_id => minion_name association in db
db.add(instance_id, minion_name)
# Accept minion and send it a highstate
ret = OrderedDict()
ret['ec2_autoscale_launch'] = {
'wheel.key.accept': [
{'match': minion_name},
]
}
ret['ec2_autoscale_highstate'] = {
'cmd.state.highstate': [
{'tgt': minion_name},
]
}
return ret
elif 'termination' in sns['Subject']:
# Retrieve minion name and cleanup db
minion_name = db.get(instance_id)
if minion_name is None:
# Minion was not handled by this reactor, do nothing
return {}
db.remove(instance_id)
return {
'ec2_autoscale_termination': {
'wheel.key.delete': [
{'match': minion_name},
]
}
}
return {}
class InstancesNamesDatabase(object):
def __init__(self, location):
# Create database directory if needed
location = op.expanduser(location)
directory = op.dirname(location)
if directory and not op.isdir(directory):
os.makedirs(directory)
self.location = location
# Create connection object and tables
try:
self.conn = sqlite3.connect(self.location)
except sqlite3.OperationalError as exc:
# Make the error more useful
exc.args = (exc.args[0] + ': %s' % location,)
raise
self.create_tables()
def create_tables(self):
with self.conn:
self.conn.execute('CREATE TABLE IF NOT EXISTS minion_names '
'(instance_id text, minion_name text)')
self.conn.execute('CREATE UNIQUE INDEX IF NOT EXISTS instance_id_index '
'ON minion_names (instance_id)')
def query(self, query, *args):
with self.conn:
return self.conn.execute(query, args)
def add(self, instance_id, minion_name):
self.query('INSERT OR IGNORE INTO minion_names '
'(instance_id, minion_name) VALUES (?, ?)',
instance_id, minion_name)
def get(self, instance_id):
row = self.query('SELECT minion_name FROM minion_names '
'WHERE instance_id = ?', instance_id).fetchone()
if row:
return row[0]
def remove(self, instance_id):
self.query('DELETE FROM minion_names WHERE instance_id = ?',
instance_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment