Created
April 28, 2015 22:36
-
-
Save cdw/22094d73c41b6c99d3b0 to your computer and use it in GitHub Desktop.
Manage an aws cluster from user-data and SQS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
sqs_control_userdata_script.py - control an aws instance from a sqs queue | |
Run this guy on startup as a userdata script and he will connect to | |
s3 to download code to a directory, and run commands in it that are | |
provided by an SQS queue, one job at a time per core | |
Created by Dave Williams on 2011-02-08 | |
""" | |
## Import present packages | |
import os | |
import sys | |
import time | |
import traceback | |
import ConfigParser | |
import subprocess as subp | |
import multiprocessing as mp | |
## Handle logging and thrown fatal errors | |
def log_it(log_message): | |
print log_message | |
with open('/dev/console', 'a') as console: | |
console.write(log_message+'\n') | |
def fatal_error(error_log_message, feed_me = "differently"): | |
log_it("ERROR: " + error_log_message) | |
log_it("SHUTTING DOWN: feed me " + feed_me + " next time") | |
os.system("shutdown now -h") | |
## Configure extra software on the node | |
out = subp.call("aptitude -ry install python-scipy python-boto p7zip-full", | |
shell=True) | |
log_it("Installed scipy, boto, p7zip with result: "+str(out)) | |
## Initialize working directory | |
os.chdir('/root') | |
HOMEDIR = os.getcwd()+'/' | |
## Configure AWS control parameters | |
ACCESS_KEY = 'insert own' | |
SECRET_KEY = 'insert own' | |
CODE_BUCKET = 'insert own' | |
JOB_QUEUE = 'insert own' | |
## Write out boto configuration | |
boto_config = ConfigParser.SafeConfigParser() | |
boto_config.add_section('Credentials') | |
boto_config.set('Credentials', 'aws_access_key_id', ACCESS_KEY) | |
boto_config.set('Credentials', 'aws_secret_access_key', SECRET_KEY) | |
with open('/etc/boto.cfg', 'wb') as config_file: | |
boto_config.write(config_file) | |
## Connect to aws with boto | |
try: | |
log_it("Connecting to boto") | |
import boto # Had to wait until .boto was written | |
S3 = boto.connect_s3() | |
SQS = boto.connect_sqs() | |
SQS.get_all_queues() # Call to test if our keys were accepted | |
except (boto.exception.NoAuthHandlerFound, boto.exception.SQSError), e: | |
log_it(e) | |
fatal_error("Probably gave bad aws keys", "valid credentials") | |
## Download files from passed bucket | |
try: | |
log_it("Downloading from code bucket") | |
code_bucket = S3.get_bucket(CODE_BUCKET) | |
for key in code_bucket.get_all_keys(): | |
if '/' in key.name: | |
try: | |
os.makedirs(HOMEDIR + key.name[:key.name.rfind('/')]) | |
except OSError: | |
pass #Already made that directory. No harm, no foul. | |
key.get_contents_to_filename(HOMEDIR + key.name) | |
os.chmod(HOMEDIR + key.name, 0776) | |
except boto.exception.S3ResponseError: | |
fatal_error("No bucket with given code_bucket name", "a valid bucket") | |
except IOError: | |
fatal_error("Couldn't write code_bucket contents locally") | |
## Connect to command queue | |
try: | |
log_it("Connecting to job queue") | |
job_queue = SQS.get_queue(JOB_QUEUE) | |
if type(job_queue) is type(None): | |
raise KeyError, "Provided job_queue name not found" | |
except KeyError: | |
fatal_error("Given job_queue does not exist", "a different queue name") | |
## Set up processes that will run our jobs | |
proc_num = mp.cpu_count() | |
processes = [mp.Process() for i in range(proc_num)] | |
jobs = [None for i in range(proc_num)] | |
[p.start() for p in processes] | |
[p.join() for p in processes] | |
## Turn control over to the job queue | |
try: | |
next_job = job_queue.read() | |
while next_job != None: | |
for i in range(proc_num): | |
if (processes[i].is_alive() is False) and (next_job != None): | |
processes[i].join() | |
if jobs[i] is not None: | |
job_queue.delete_message(jobs[i]) | |
jobs[i] = next_job | |
log_it("Gonna run " + jobs[i].get_body()) | |
processes[i] = mp.Process(target = subp.call, | |
args = (jobs[i].get_body(),), | |
kwargs = {'shell': True}) | |
processes[i].start() | |
next_job = job_queue.read() | |
time.sleep(.5) | |
else: | |
log_it("Empty " + job_queue.name + ", waiting out current runs") | |
[p.join() for p in processes] | |
[job_queue.delete_message(job) for job in jobs] | |
log_it("All done") | |
except Exception as instance: | |
log_it("### An error occurred while running jobs") | |
log_it("Exception of type " + type(instance) + ": " + instance.message) | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
log_it(repr(traceback.format_exception(exc_type, exc_value, exc_traceback))) | |
log_it("Going no further, shutting down now") | |
finally: | |
os.system('shutdown now -h') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment