-
-
Save copernicus/11388921 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import boto | |
from boto.sqs.message import RawMessage | |
import tempfile | |
import json | |
import logging | |
import argparse | |
import gzip | |
import redis | |
import os | |
import time | |
''' | |
Packages required: redis, boto | |
''' | |
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") | |
stream = logging.StreamHandler() | |
stream.setFormatter(logFormatter) | |
logging.getLogger().addHandler(stream) | |
logger = logging.getLogger(__name__) | |
def sqs_fetch(queue): | |
""" | |
:param str queue name: | |
:rtype list: | |
Returns a list of message from SQS | |
""" | |
connection = boto.connect_sqs() | |
queue = connection.get_queue(queue_name=queue) | |
queue.set_message_class(RawMessage) | |
return queue.get_messages(10) | |
class CloudTrailS3EventLoader(object): | |
connection = None | |
def __init__(self): | |
if self.connection is None: | |
self.connection = boto.connect_s3() | |
def fetch_object(self, sqs_event): | |
""" | |
:param sqs_event_body object: | |
:rtype str filename: | |
Accepts an object that defines the bucket and object location | |
""" | |
sqs_event_body = json.loads(sqs_event['Message']) | |
bucket = self.connection.get_bucket(sqs_event_body['s3Bucket']) | |
key = bucket.get_key(sqs_event_body['s3ObjectKey'][0]) | |
_, t = tempfile.mkstemp(suffix='.gz') | |
key.get_contents_to_filename(t) | |
return t, 's3://%s/%s' % (sqs_event_body['s3Bucket'], sqs_event_body['s3ObjectKey'][0]) | |
def camel_to_lower(camels): | |
import re | |
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camels) | |
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() | |
def write_data(filename, remote_path=None): | |
f = gzip.open(filename) | |
j = json.load(f) | |
f.close() | |
pipe = redis_client.pipeline() | |
for record in j['Records']: | |
try: | |
record['@timestamp'] = record['eventTime'] | |
if 'eventName' in record: | |
record['type'] = '%s.%s' % (opts.doctype_prefix, camel_to_lower(record['eventName'])) | |
else: | |
record['type'] = opts.doctype_prefix | |
if remote_path is not None: | |
record['path'] = remote_path | |
pipe.lpush(opts.redis_list, json.dumps(record)) | |
except Exception as ex: | |
logger.error(ex) | |
pipe.execute() | |
def main(): | |
try: | |
messages = sqs_fetch(opts.sqs_queue) | |
logger.info("messages received [%s]" % len(messages)) | |
for message in messages: | |
mesg = json.loads(message.get_body()) | |
local, remote = loader.fetch_object(mesg) | |
write_data(local, remote) | |
os.unlink(local) | |
message.delete() | |
except Exception as me: | |
logger.error(me) | |
if __name__ == '__main__': | |
epilog = ''' | |
Example use cases: | |
The following example relies on IAM roles to authenticate: | |
`cloudtrail_monitor -r arn:aws:iam::99999999999:role/ct-mon -Q ct-sqs-queue` | |
The following example expects credentials to be in environment vars: | |
`cloudtrail_monitor -Q ct-sqs-queue` | |
''' | |
argparser = argparse.ArgumentParser(description='A simple AWS CloudTrail/Logstash SQS driven event loader', | |
epilog=epilog) | |
argparser.add_argument('--interval', '-i', type=int, help="message polling interval", default=120) | |
argparser.add_argument('--sqs_queue', '-Q', type=str, help="AWS SQS Queue the events are published to") | |
argparser.add_argument('--aws_role', '-r', type=str, help="AWS IAM role to assume", default=None) | |
argparser.add_argument('--redis_list', '-L', type=str, help="redis list to publish events to", default='logstash') | |
argparser.add_argument('--redis_host', '-H', type=str, help="redis Server hostname (default: 127.0.0.1).", | |
default='127.0.0.1') | |
argparser.add_argument('--redis_port', '-p', type=int, help="redis Server port (default: 6379).", default='6379') | |
argparser.add_argument('--redis_db', '-n', type=int, help="redis Database number.", default=0) | |
argparser.add_argument('--redis_password', '-a', type=str, help="redis Database number.", default=None) | |
argparser.add_argument('--doctype_prefix', '-t', type=str, help="logstash document type root", default='cloudtrail') | |
argparser.add_argument('--debug', action='store_true', default=False) | |
argparser.add_argument('config', nargs='?', help="JSON-format config file", default=None) | |
opts = argparser.parse_args() | |
if opts.debug: | |
logger.setLevel(logging.DEBUG) | |
else: | |
logger.setLevel(logging.INFO) | |
logger.info('starting polling operations with SQS queue: %s ...' % opts.sqs_queue) | |
logger.debug("startup options: %s" % opts.__dict__) | |
""" | |
I do not, will not ever willing or knowingly write an app that does | |
not use environment vars or STS credentials for AWS | |
""" | |
loader = CloudTrailS3EventLoader() | |
try: | |
redis_client = redis.Redis(host=opts.redis_host, port=opts.redis_port, db=opts.redis_db, | |
password=opts.redis_password) | |
except redis.RedisError as e: | |
logger.error(e) | |
while True: | |
main() | |
time.sleep(opts.interval) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment