Last active
August 29, 2015 14:04
-
-
Save flupke/d1b62471e7782e412b1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!py | |
import pprint | |
import json | |
import binascii | |
import functools | |
import traceback | |
import os | |
import os.path as op | |
import sqlite3 | |
import time | |
import logging | |
import M2Crypto | |
import requests | |
from salt.utils import smtp | |
from salt.utils.odict import OrderedDict | |
from salt.cloud import CloudClient | |
from salt.key import Key | |
logger = logging.getLogger(__name__) | |
WAIT_TIMEOUT = 60 * 10 | |
def send_mail(subject, content): | |
''' | |
Send an email with saltstack. | |
''' | |
msg_kwargs = { | |
'smtp.subject': subject, | |
'smtp.content': '%s\r\n' % content, | |
} | |
smtp.send(msg_kwargs, __opts__) | |
def send_mail_on_error(func): | |
''' | |
A decorator sending exceptions by email. | |
''' | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as exc: | |
send_mail('Error in EC2 Autoscale Reactor: %s' % exc, | |
traceback.format_exc()) | |
raise | |
return wrapper | |
@send_mail_on_error | |
def run(): | |
''' | |
Run the reactor | |
''' | |
sns = data['post'] | |
if 'SubscribeURL' in sns: | |
send_mail('EC2 Autoscale Subscription (via Salt Reactor)', | |
pprint.pformat(sns)) | |
return {} | |
url_check = sns['SigningCertURL'].replace('https://', '') | |
url_comps = url_check.split('/') | |
if not url_comps[0].endswith('.amazonaws.com'): | |
# The expected URL does not seem to come from Amazon, do not try to | |
# process it | |
send_mail('EC2 Autoscale SigningCertURL Error (via Salt Reactor)', | |
'There was an error with the EC2 SigningCertURL. ' | |
'\r\n{1} \r\n{2} \r\n' | |
'Content received was:\r\n\r\n{0}'.format( | |
pprint.pformat(sns), url_check, url_comps[0])) | |
return {} | |
if not 'Subject' in sns: | |
sns['Subject'] = '' | |
pem_request = requests.request('GET', sns['SigningCertURL']) | |
pem = pem_request.text | |
str_to_sign = ( | |
'Message\n{Message}\n' | |
'MessageId\n{MessageId}\n' | |
'Subject\n{Subject}\n' | |
'Timestamp\n{Timestamp}\n' | |
'TopicArn\n{TopicArn}\n' | |
'Type\n{Type}\n' | |
).format(**sns) | |
cert = M2Crypto.X509.load_cert_string(str(pem)) | |
pubkey = cert.get_pubkey() | |
pubkey.reset_context(md='sha1') | |
pubkey.verify_init() | |
pubkey.verify_update(str_to_sign.encode()) | |
decoded = binascii.a2b_base64(sns['Signature']) | |
result = pubkey.verify_final(decoded) | |
if result != 1: | |
send_mail('EC2 Autoscale Signature Error (via Salt Reactor)', | |
'There was an error with the EC2 Signature. ' | |
'Content received was:\r\n\r\n{0}'.format(pprint.pformat(sns))) | |
return {} | |
message = json.loads(sns['Message']) | |
instance_id = str(message['EC2InstanceId']) | |
# Create minion names database | |
db = InstancesNamesDatabase(__opts__['ec2.autoscale']['db_location']) | |
if 'launch' in sns['Subject']: | |
logger.warning('%s launching', instance_id) | |
# Retrieve instance data and get minion name | |
client = CloudClient(op.join(op.dirname(__opts__['conf_file']), 'cloud')) | |
instance_data = client.action('show_instance', names=[instance_id]) | |
provider = __opts__['ec2.autoscale']['provider'] | |
minion_name = instance_data[provider]['ec2'][instance_id]['privateDnsName'].partition('.')[0] | |
logger.warning('%s name should be %s', instance_id, minion_name) | |
# Wait for the minion to register | |
skey = Key(__opts__) | |
start_time = time.time() | |
while True: | |
keys = skey.list_keys() | |
if minion_name in keys['minions']: | |
# Minion is already accepted, do nothing | |
logger.warning('minion already accepted') | |
return {} | |
elif time.time() - start_time > WAIT_TIMEOUT: | |
message = '%s did not show up after %ss' % (minion_name, | |
WAIT_TIMEOUT) | |
logger.warning(message) | |
send_mail('EC2 Autoscale timeout', message) | |
return {} | |
elif minion_name in keys['minions_pre']: | |
break | |
time.sleep(1) | |
end_time = time.time() | |
logger.warning('waited for %s %s seconds', minion_name, | |
end_time - start_time) | |
# Store insance_id => minion_name association in db | |
db.add(instance_id, minion_name) | |
# Accept minion and send it a highstate | |
ret = OrderedDict() | |
ret['ec2_autoscale_launch'] = { | |
'wheel.key.accept': [ | |
{'match': minion_name}, | |
] | |
} | |
ret['ec2_autoscale_highstate'] = { | |
'cmd.state.highstate': [ | |
{'tgt': minion_name}, | |
] | |
} | |
return ret | |
elif 'termination' in sns['Subject']: | |
# Retrieve minion name and cleanup db | |
minion_name = db.get(instance_id) | |
if minion_name is None: | |
# Minion was not handled by this reactor, do nothing | |
return {} | |
db.remove(instance_id) | |
return { | |
'ec2_autoscale_termination': { | |
'wheel.key.delete': [ | |
{'match': minion_name}, | |
] | |
} | |
} | |
return {} | |
class InstancesNamesDatabase(object): | |
def __init__(self, location): | |
# Create database directory if needed | |
location = op.expanduser(location) | |
directory = op.dirname(location) | |
if directory and not op.isdir(directory): | |
os.makedirs(directory) | |
self.location = location | |
# Create connection object and tables | |
try: | |
self.conn = sqlite3.connect(self.location) | |
except sqlite3.OperationalError as exc: | |
# Make the error more useful | |
exc.args = (exc.args[0] + ': %s' % location,) | |
raise | |
self.create_tables() | |
def create_tables(self): | |
with self.conn: | |
self.conn.execute('CREATE TABLE IF NOT EXISTS minion_names ' | |
'(instance_id text, minion_name text)') | |
self.conn.execute('CREATE UNIQUE INDEX IF NOT EXISTS instance_id_index ' | |
'ON minion_names (instance_id)') | |
def query(self, query, *args): | |
with self.conn: | |
return self.conn.execute(query, args) | |
def add(self, instance_id, minion_name): | |
self.query('INSERT OR IGNORE INTO minion_names ' | |
'(instance_id, minion_name) VALUES (?, ?)', | |
instance_id, minion_name) | |
def get(self, instance_id): | |
row = self.query('SELECT minion_name FROM minion_names ' | |
'WHERE instance_id = ?', instance_id).fetchone() | |
if row: | |
return row[0] | |
def remove(self, instance_id): | |
self.query('DELETE FROM minion_names WHERE instance_id = ?', | |
instance_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment